Add ssh public key support to the asuswrt component (#2287)
The pexpect.pxssh module has support for using public key authentication. [1] This commit adds support for leveraging that and establishing a ssh connection with a public key instead of a password. [1] http://pexpect.readthedocs.io/en/stable/api/pxssh.html#pexpect.pxssh.pxssh.login
This commit is contained in:
parent
6dcf3682df
commit
1381984b77
2 changed files with 84 additions and 3 deletions
|
@ -56,9 +56,13 @@ _IP_NEIGH_REGEX = re.compile(
|
|||
def get_scanner(hass, config):
|
||||
"""Validate the configuration and return an ASUS-WRT scanner."""
|
||||
if not validate_config(config,
|
||||
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
|
||||
{DOMAIN: [CONF_HOST, CONF_USERNAME]},
|
||||
_LOGGER):
|
||||
return None
|
||||
elif CONF_PASSWORD not in config[DOMAIN] and \
|
||||
'pub_key' not in config[DOMAIN]:
|
||||
_LOGGER.error("Either a public key or password must be provided")
|
||||
return None
|
||||
|
||||
scanner = AsusWrtDeviceScanner(config[DOMAIN])
|
||||
|
||||
|
@ -75,7 +79,8 @@ class AsusWrtDeviceScanner(object):
|
|||
"""Initialize the scanner."""
|
||||
self.host = config[CONF_HOST]
|
||||
self.username = str(config[CONF_USERNAME])
|
||||
self.password = str(config[CONF_PASSWORD])
|
||||
self.password = str(config.get(CONF_PASSWORD))
|
||||
self.pub_key = str(config.get('pub_key'))
|
||||
self.protocol = config.get('protocol')
|
||||
self.mode = config.get('mode')
|
||||
|
||||
|
@ -126,9 +131,16 @@ class AsusWrtDeviceScanner(object):
|
|||
def ssh_connection(self):
|
||||
"""Retrieve data from ASUSWRT via the ssh protocol."""
|
||||
from pexpect import pxssh
|
||||
|
||||
try:
|
||||
ssh = pxssh.pxssh()
|
||||
ssh.login(self.host, self.username, self.password)
|
||||
if self.pub_key:
|
||||
ssh.login(self.host, self.username, ssh_key=self.pub_key)
|
||||
elif self.password:
|
||||
ssh.login(self.host, self.username, self.password)
|
||||
else:
|
||||
_LOGGER.error('No password or public key specified')
|
||||
return('', '', '')
|
||||
ssh.sendline(_IP_NEIGH_CMD)
|
||||
ssh.prompt()
|
||||
neighbors = ssh.before.split(b'\n')[1:-1]
|
||||
|
|
69
tests/components/device_tracker/test_asuswrt.py
Normal file
69
tests/components/device_tracker/test_asuswrt.py
Normal file
|
@ -0,0 +1,69 @@
|
|||
"""The tests for the ASUSWRT device tracker platform."""
|
||||
|
||||
import os
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
from homeassistant.components import device_tracker
|
||||
from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME,
|
||||
CONF_HOST)
|
||||
|
||||
from tests.common import get_test_home_assistant
|
||||
|
||||
|
||||
class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
|
||||
"""Tests for the ASUSWRT device tracker platform."""
|
||||
|
||||
def setUp(self): # pylint: disable=invalid-name
|
||||
"""Setup things to be run when tests are started."""
|
||||
self.hass = get_test_home_assistant()
|
||||
|
||||
def tearDown(self): # pylint: disable=invalid-name
|
||||
"""Stop everything that was started."""
|
||||
try:
|
||||
os.remove(self.hass.config.path(device_tracker.YAML_DEVICES))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def test_password_or_pub_key_required(self):
|
||||
"""Test creating an AsusWRT scanner without a pass or pubkey."""
|
||||
self.assertIsNone(device_tracker.asuswrt.get_scanner(
|
||||
self.hass, {device_tracker.DOMAIN: {
|
||||
CONF_PLATFORM: 'asuswrt',
|
||||
CONF_HOST: 'fake_host',
|
||||
CONF_USERNAME: 'fake_user'
|
||||
}}))
|
||||
|
||||
@mock.patch(
|
||||
'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner',
|
||||
return_value=mock.MagicMock())
|
||||
def test_get_scanner_with_password_no_pubkey(self, asuswrt_mock):
|
||||
"""Test creating an AsusWRT scanner with a password and no pubkey."""
|
||||
conf_dict = {
|
||||
device_tracker.DOMAIN: {
|
||||
CONF_PLATFORM: 'asuswrt',
|
||||
CONF_HOST: 'fake_host',
|
||||
CONF_USERNAME: 'fake_user',
|
||||
CONF_PASSWORD: 'fake_pass'
|
||||
}
|
||||
}
|
||||
self.assertIsNotNone(device_tracker.asuswrt.get_scanner(
|
||||
self.hass, conf_dict))
|
||||
asuswrt_mock.assert_called_once_with(conf_dict[device_tracker.DOMAIN])
|
||||
|
||||
@mock.patch(
|
||||
'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner',
|
||||
return_value=mock.MagicMock())
|
||||
def test_get_scanner_with_pubkey_no_password(self, asuswrt_mock):
|
||||
"""Test creating an AsusWRT scanner with a pubkey and no password."""
|
||||
conf_dict = {
|
||||
device_tracker.DOMAIN: {
|
||||
CONF_PLATFORM: 'asuswrt',
|
||||
CONF_HOST: 'fake_host',
|
||||
CONF_USERNAME: 'fake_user',
|
||||
'pub_key': '/fake_path'
|
||||
}
|
||||
}
|
||||
self.assertIsNotNone(device_tracker.asuswrt.get_scanner(
|
||||
self.hass, conf_dict))
|
||||
asuswrt_mock.assert_called_once_with(conf_dict[device_tracker.DOMAIN])
|
Loading…
Add table
Reference in a new issue