Adding tests to check the component after latest patch
This commit is contained in:
parent
5af7666a61
commit
2d8ef36a6c
3 changed files with 295 additions and 47 deletions
|
@ -341,7 +341,6 @@ omit =
|
|||
homeassistant/components/cover/scsgate.py
|
||||
homeassistant/components/device_tracker/actiontec.py
|
||||
homeassistant/components/device_tracker/aruba.py
|
||||
homeassistant/components/device_tracker/asuswrt.py
|
||||
homeassistant/components/device_tracker/automatic.py
|
||||
homeassistant/components/device_tracker/bbox.py
|
||||
homeassistant/components/device_tracker/bluetooth_le_tracker.py
|
||||
|
|
|
@ -25,9 +25,7 @@ _LOGGER = logging.getLogger(__name__)
|
|||
|
||||
CONF_PUB_KEY = 'pub_key'
|
||||
CONF_SSH_KEY = 'ssh_key'
|
||||
|
||||
DEFAULT_SSH_PORT = 22
|
||||
|
||||
SECRET_GROUP = 'Password or SSH Key'
|
||||
|
||||
PLATFORM_SCHEMA = vol.All(
|
||||
|
@ -118,20 +116,10 @@ class AsusWrtDeviceScanner(DeviceScanner):
|
|||
self.port = config[CONF_PORT]
|
||||
|
||||
if self.protocol == 'ssh':
|
||||
if not (self.ssh_key or self.password):
|
||||
_LOGGER.error("No password or private key specified")
|
||||
self.success_init = False
|
||||
return
|
||||
|
||||
self.connection = SshConnection(
|
||||
self.host, self.port, self.username, self.password,
|
||||
self.ssh_key, self.mode == 'ap')
|
||||
else:
|
||||
if not self.password:
|
||||
_LOGGER.error("No password specified")
|
||||
self.success_init = False
|
||||
return
|
||||
|
||||
self.connection = TelnetConnection(
|
||||
self.host, self.port, self.username, self.password,
|
||||
self.mode == 'ap')
|
||||
|
@ -177,11 +165,16 @@ class AsusWrtDeviceScanner(DeviceScanner):
|
|||
"""
|
||||
devices = {}
|
||||
devices.update(self._get_wl())
|
||||
devices = self._get_arp(devices)
|
||||
devices = self._get_neigh(devices)
|
||||
devices.update(self._get_arp())
|
||||
devices.update(self._get_neigh(devices))
|
||||
if not self.mode == 'ap':
|
||||
devices.update(self._get_leases(devices))
|
||||
return devices
|
||||
|
||||
ret_devices = {}
|
||||
for key in devices:
|
||||
if devices[key].ip is not None:
|
||||
ret_devices[key] = devices[key]
|
||||
return ret_devices
|
||||
|
||||
def _get_wl(self):
|
||||
lines = self.connection.run_command(_WL_CMD)
|
||||
|
@ -219,18 +212,13 @@ class AsusWrtDeviceScanner(DeviceScanner):
|
|||
result = _parse_lines(lines, _IP_NEIGH_REGEX)
|
||||
devices = {}
|
||||
for device in result:
|
||||
if device['mac']:
|
||||
if device['mac'] is not None:
|
||||
mac = device['mac'].upper()
|
||||
devices[mac] = Device(mac, None, None)
|
||||
else:
|
||||
cur_devices = {
|
||||
k: v for k, v in
|
||||
cur_devices.items() if v.ip != device['ip']
|
||||
}
|
||||
cur_devices.update(devices)
|
||||
return cur_devices
|
||||
old_ip = cur_devices.get(mac, {}).ip or None
|
||||
devices[mac] = Device(mac, device.get('ip', old_ip), None)
|
||||
return devices
|
||||
|
||||
def _get_arp(self, cur_devices):
|
||||
def _get_arp(self):
|
||||
lines = self.connection.run_command(_ARP_CMD)
|
||||
if not lines:
|
||||
return {}
|
||||
|
@ -240,13 +228,7 @@ class AsusWrtDeviceScanner(DeviceScanner):
|
|||
if device['mac']:
|
||||
mac = device['mac'].upper()
|
||||
devices[mac] = Device(mac, device['ip'], None)
|
||||
else:
|
||||
cur_devices = {
|
||||
k: v for k, v in
|
||||
cur_devices.items() if v.ip != device['ip']
|
||||
}
|
||||
cur_devices.update(devices)
|
||||
return cur_devices
|
||||
return devices
|
||||
|
||||
|
||||
class _Connection:
|
||||
|
@ -272,7 +254,7 @@ class SshConnection(_Connection):
|
|||
|
||||
def __init__(self, host, port, username, password, ssh_key, ap):
|
||||
"""Initialize the SSH connection properties."""
|
||||
super(SshConnection, self).__init__()
|
||||
super().__init__()
|
||||
|
||||
self._ssh = None
|
||||
self._host = host
|
||||
|
@ -322,7 +304,7 @@ class SshConnection(_Connection):
|
|||
self._ssh.login(self._host, self._username,
|
||||
password=self._password, port=self._port)
|
||||
|
||||
super(SshConnection, self).connect()
|
||||
super().connect()
|
||||
|
||||
def disconnect(self): \
|
||||
# pylint: disable=broad-except
|
||||
|
@ -334,7 +316,7 @@ class SshConnection(_Connection):
|
|||
finally:
|
||||
self._ssh = None
|
||||
|
||||
super(SshConnection, self).disconnect()
|
||||
super().disconnect()
|
||||
|
||||
|
||||
class TelnetConnection(_Connection):
|
||||
|
@ -342,7 +324,7 @@ class TelnetConnection(_Connection):
|
|||
|
||||
def __init__(self, host, port, username, password, ap):
|
||||
"""Initialize the Telnet connection properties."""
|
||||
super(TelnetConnection, self).__init__()
|
||||
super().__init__()
|
||||
|
||||
self._telnet = None
|
||||
self._host = host
|
||||
|
@ -361,7 +343,6 @@ class TelnetConnection(_Connection):
|
|||
try:
|
||||
if not self.connected:
|
||||
self.connect()
|
||||
|
||||
self._telnet.write('{}\n'.format(command).encode('ascii'))
|
||||
data = (self._telnet.read_until(self._prompt_string).
|
||||
split(b'\n')[1:-1])
|
||||
|
@ -392,7 +373,7 @@ class TelnetConnection(_Connection):
|
|||
self._telnet.write((self._password + '\n').encode('ascii'))
|
||||
self._prompt_string = self._telnet.read_until(b'#').split(b'\n')[-1]
|
||||
|
||||
super(TelnetConnection, self).connect()
|
||||
super().connect()
|
||||
|
||||
def disconnect(self): \
|
||||
# pylint: disable=broad-except
|
||||
|
@ -402,4 +383,4 @@ class TelnetConnection(_Connection):
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
super(TelnetConnection, self).disconnect()
|
||||
super().disconnect()
|
||||
|
|
|
@ -5,6 +5,7 @@ import unittest
|
|||
from unittest import mock
|
||||
|
||||
import voluptuous as vol
|
||||
from future.backports import socket
|
||||
|
||||
from homeassistant.setup import setup_component
|
||||
from homeassistant.components import device_tracker
|
||||
|
@ -12,8 +13,9 @@ from homeassistant.components.device_tracker import (
|
|||
CONF_CONSIDER_HOME, CONF_TRACK_NEW, CONF_NEW_DEVICE_DEFAULTS,
|
||||
CONF_AWAY_HIDE)
|
||||
from homeassistant.components.device_tracker.asuswrt import (
|
||||
CONF_PROTOCOL, CONF_MODE, CONF_PUB_KEY, DOMAIN,
|
||||
CONF_PORT, PLATFORM_SCHEMA)
|
||||
CONF_PROTOCOL, CONF_MODE, CONF_PUB_KEY, DOMAIN, _ARP_REGEX,
|
||||
CONF_PORT, PLATFORM_SCHEMA, Device, get_scanner, AsusWrtDeviceScanner,
|
||||
_parse_lines, SshConnection, TelnetConnection)
|
||||
from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME,
|
||||
CONF_HOST)
|
||||
|
||||
|
@ -23,6 +25,84 @@ from tests.common import (
|
|||
|
||||
FAKEFILE = None
|
||||
|
||||
VALID_CONFIG_ROUTER_SSH = {DOMAIN: {
|
||||
CONF_PLATFORM: 'asuswrt',
|
||||
CONF_HOST: 'fake_host',
|
||||
CONF_USERNAME: 'fake_user',
|
||||
CONF_PROTOCOL: 'ssh',
|
||||
CONF_MODE: 'router',
|
||||
CONF_PORT: '22'
|
||||
}
|
||||
}
|
||||
|
||||
WL_DATA = [
|
||||
'assoclist 01:02:03:04:06:08\r',
|
||||
'assoclist 08:09:10:11:12:14\r',
|
||||
'assoclist 08:09:10:11:12:15\r'
|
||||
]
|
||||
|
||||
WL_DEVICES = {
|
||||
'01:02:03:04:06:08': Device(
|
||||
mac='01:02:03:04:06:08', ip=None, name=None),
|
||||
'08:09:10:11:12:14': Device(
|
||||
mac='08:09:10:11:12:14', ip=None, name=None),
|
||||
'08:09:10:11:12:15': Device(
|
||||
mac='08:09:10:11:12:15', ip=None, name=None)
|
||||
}
|
||||
|
||||
ARP_DATA = [
|
||||
'? (123.123.123.125) at 01:02:03:04:06:08 [ether] on eth0\r',
|
||||
'? (123.123.123.126) at 08:09:10:11:12:14 [ether] on br0\r'
|
||||
'? (123.123.123.127) at <incomplete> on br0\r'
|
||||
]
|
||||
|
||||
ARP_DEVICES = {
|
||||
'01:02:03:04:06:08': Device(
|
||||
mac='01:02:03:04:06:08', ip='123.123.123.125', name=None),
|
||||
'08:09:10:11:12:14': Device(
|
||||
mac='08:09:10:11:12:14', ip='123.123.123.126', name=None)
|
||||
}
|
||||
|
||||
NEIGH_DATA = [
|
||||
'123.123.123.125 dev eth0 lladdr 01:02:03:04:06:08 REACHABLE\r',
|
||||
'123.123.123.126 dev br0 lladdr 08:09:10:11:12:14 STALE\r'
|
||||
'123.123.123.127 dev br0 FAILED\r'
|
||||
]
|
||||
|
||||
NEIGH_DEVICES = {
|
||||
'01:02:03:04:06:08': Device(
|
||||
mac='01:02:03:04:06:08', ip='123.123.123.125', name=None),
|
||||
'08:09:10:11:12:14': Device(
|
||||
mac='08:09:10:11:12:14', ip='123.123.123.126', name=None)
|
||||
}
|
||||
|
||||
LEASES_DATA = [
|
||||
'51910 01:02:03:04:06:08 123.123.123.125 TV 01:02:03:04:06:08\r',
|
||||
'79986 01:02:03:04:06:10 123.123.123.127 android 01:02:03:04:06:15\r',
|
||||
'23523 08:09:10:11:12:14 123.123.123.126 * 08:09:10:11:12:14\r',
|
||||
]
|
||||
|
||||
LEASES_DEVICES = {
|
||||
'01:02:03:04:06:08': Device(
|
||||
mac='01:02:03:04:06:08', ip='123.123.123.125', name='TV'),
|
||||
'08:09:10:11:12:14': Device(
|
||||
mac='08:09:10:11:12:14', ip='123.123.123.126', name='')
|
||||
}
|
||||
|
||||
WAKE_DEVICES = {
|
||||
'01:02:03:04:06:08': Device(
|
||||
mac='01:02:03:04:06:08', ip='123.123.123.125', name='TV'),
|
||||
'08:09:10:11:12:14': Device(
|
||||
mac='08:09:10:11:12:14', ip='123.123.123.126', name='')
|
||||
}
|
||||
|
||||
WAKE_DEVICES_AP = {
|
||||
'01:02:03:04:06:08': Device(
|
||||
mac='01:02:03:04:06:08', ip='123.123.123.125', name=None),
|
||||
'08:09:10:11:12:14': Device(
|
||||
mac='08:09:10:11:12:14', ip='123.123.123.126', name=None)
|
||||
}
|
||||
|
||||
|
||||
def setup_module():
|
||||
"""Setup the test module."""
|
||||
|
@ -55,6 +135,24 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
|
|||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def test_parse_lines_wrong_input(self):
|
||||
"""Testing parse lines."""
|
||||
output = _parse_lines("asdf asdfdfsafad", _ARP_REGEX)
|
||||
self.assertEqual(output, [])
|
||||
|
||||
def test_get_device_name(self):
|
||||
"""Test for getting name."""
|
||||
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
|
||||
scanner.last_results = WAKE_DEVICES
|
||||
self.assertEqual('TV', scanner.get_device_name('01:02:03:04:06:08'))
|
||||
self.assertEqual(None, scanner.get_device_name('01:02:03:04:08:08'))
|
||||
|
||||
def test_scan_devices(self):
|
||||
"""Test for scan devices."""
|
||||
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
|
||||
scanner.last_results = WAKE_DEVICES
|
||||
self.assertEqual(list(WAKE_DEVICES), scanner.scan_devices())
|
||||
|
||||
def test_password_or_pub_key_required(self): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test creating an AsusWRT scanner without a pass or pubkey."""
|
||||
|
@ -63,13 +161,14 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
|
|||
self.hass, DOMAIN, {DOMAIN: {
|
||||
CONF_PLATFORM: 'asuswrt',
|
||||
CONF_HOST: 'fake_host',
|
||||
CONF_USERNAME: 'fake_user'
|
||||
CONF_USERNAME: 'fake_user',
|
||||
CONF_PROTOCOL: 'ssh'
|
||||
}})
|
||||
|
||||
@mock.patch(
|
||||
'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner',
|
||||
return_value=mock.MagicMock())
|
||||
def test_get_scanner_with_password_no_pubkey(self, asuswrt_mock): \
|
||||
def test_get_scanner_with_password_no_pubkey(self, asuswrt_mock): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test creating an AsusWRT scanner with a password and no pubkey."""
|
||||
conf_dict = {
|
||||
|
@ -99,7 +198,7 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
|
|||
@mock.patch(
|
||||
'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner',
|
||||
return_value=mock.MagicMock())
|
||||
def test_get_scanner_with_pubkey_no_password(self, asuswrt_mock): \
|
||||
def test_get_scanner_with_pubkey_no_password(self, asuswrt_mock): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test creating an AsusWRT scanner with a pubkey and no password."""
|
||||
conf_dict = {
|
||||
|
@ -178,7 +277,7 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
|
|||
password='fake_pass', port=22)
|
||||
)
|
||||
|
||||
def test_ssh_login_without_password_or_pubkey(self): \
|
||||
def test_ssh_login_without_password_or_pubkey(self): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test that login is not called without password or pub_key."""
|
||||
ssh = mock.MagicMock()
|
||||
|
@ -249,7 +348,7 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
|
|||
mock.call(b'#')
|
||||
)
|
||||
|
||||
def test_telnet_login_without_password(self): \
|
||||
def test_telnet_login_without_password(self): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test that login is not called without password or pub_key."""
|
||||
telnet = mock.MagicMock()
|
||||
|
@ -277,3 +376,172 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
|
|||
assert setup_component(self.hass, DOMAIN,
|
||||
{DOMAIN: conf_dict})
|
||||
telnet.login.assert_not_called()
|
||||
|
||||
def test_get_asuswrt_data(self):
|
||||
"""Test aususwrt data fetch."""
|
||||
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
|
||||
scanner._get_wl = mock.Mock()
|
||||
scanner._get_arp = mock.Mock()
|
||||
scanner._get_neigh = mock.Mock()
|
||||
scanner._get_leases = mock.Mock()
|
||||
scanner._get_wl.return_value = WL_DEVICES
|
||||
scanner._get_arp.return_value = ARP_DEVICES
|
||||
scanner._get_neigh.return_value = NEIGH_DEVICES
|
||||
scanner._get_leases.return_value = LEASES_DEVICES
|
||||
self.assertEqual(WAKE_DEVICES, scanner.get_asuswrt_data())
|
||||
|
||||
def test_get_asuswrt_data_ap(self):
|
||||
"""Test for get asuswrt_data in ap mode."""
|
||||
conf = VALID_CONFIG_ROUTER_SSH.copy()[DOMAIN]
|
||||
conf[CONF_MODE] = 'ap'
|
||||
scanner = AsusWrtDeviceScanner(conf)
|
||||
scanner._get_wl = mock.Mock()
|
||||
scanner._get_arp = mock.Mock()
|
||||
scanner._get_neigh = mock.Mock()
|
||||
scanner._get_leases = mock.Mock()
|
||||
scanner._get_wl.return_value = WL_DEVICES
|
||||
scanner._get_arp.return_value = ARP_DEVICES
|
||||
scanner._get_neigh.return_value = NEIGH_DEVICES
|
||||
scanner._get_leases.return_value = LEASES_DEVICES
|
||||
self.assertEqual(WAKE_DEVICES_AP, scanner.get_asuswrt_data())
|
||||
|
||||
def test_update_info(self):
|
||||
"""Test for update info."""
|
||||
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
|
||||
scanner.get_asuswrt_data = mock.Mock()
|
||||
scanner.get_asuswrt_data.return_value = WAKE_DEVICES
|
||||
self.assertTrue(scanner._update_info())
|
||||
self.assertTrue(scanner.last_results, WAKE_DEVICES)
|
||||
scanner.success_init = False
|
||||
self.assertFalse(scanner._update_info())
|
||||
|
||||
@mock.patch(
|
||||
'homeassistant.components.device_tracker.asuswrt.SshConnection')
|
||||
def test_get_wl(self, mocked_ssh):
|
||||
"""Testing wl."""
|
||||
mocked_ssh.run_command.return_value = WL_DATA
|
||||
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
|
||||
scanner.connection = mocked_ssh
|
||||
self.assertEqual(WL_DEVICES, scanner._get_wl())
|
||||
mocked_ssh.run_command.return_value = ''
|
||||
self.assertEqual({}, scanner._get_wl())
|
||||
|
||||
@mock.patch(
|
||||
'homeassistant.components.device_tracker.asuswrt.SshConnection')
|
||||
def test_get_arp(self, mocked_ssh):
|
||||
"""Testing arp."""
|
||||
mocked_ssh.run_command.return_value = ARP_DATA
|
||||
|
||||
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
|
||||
scanner.connection = mocked_ssh
|
||||
self.assertEqual(ARP_DEVICES, scanner._get_arp())
|
||||
mocked_ssh.run_command.return_value = ''
|
||||
self.assertEqual({}, scanner._get_arp())
|
||||
|
||||
@mock.patch(
|
||||
'homeassistant.components.device_tracker.asuswrt.SshConnection')
|
||||
def test_get_neigh(self, mocked_ssh):
|
||||
"""Testing neigh."""
|
||||
mocked_ssh.run_command.return_value = NEIGH_DATA
|
||||
|
||||
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
|
||||
scanner.connection = mocked_ssh
|
||||
self.assertEqual(NEIGH_DEVICES, scanner._get_neigh(ARP_DEVICES.copy()))
|
||||
mocked_ssh.run_command.return_value = ''
|
||||
self.assertEqual({}, scanner._get_neigh(ARP_DEVICES.copy()))
|
||||
|
||||
@mock.patch(
|
||||
'homeassistant.components.device_tracker.asuswrt.SshConnection')
|
||||
def test_get_leases(self, mocked_ssh):
|
||||
"""Testing leases."""
|
||||
mocked_ssh.run_command.return_value = LEASES_DATA
|
||||
|
||||
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
|
||||
scanner.connection = mocked_ssh
|
||||
self.assertEqual(
|
||||
LEASES_DEVICES, scanner._get_leases(NEIGH_DEVICES.copy()))
|
||||
mocked_ssh.run_command.return_value = ''
|
||||
self.assertEqual({}, scanner._get_leases(NEIGH_DEVICES.copy()))
|
||||
|
||||
|
||||
class TestSshConnection(unittest.TestCase):
|
||||
"""Testing SshConnection."""
|
||||
|
||||
def setUp(self):
|
||||
"""Setup test env."""
|
||||
self.connection = SshConnection(
|
||||
'fake', 'fake', 'fake', 'fake', 'fake', 'fake')
|
||||
self.connection._connected = True
|
||||
|
||||
def test_run_command_exception_eof(self):
|
||||
"""Testing exception in run_command."""
|
||||
from pexpect import exceptions
|
||||
self.connection._ssh = mock.Mock()
|
||||
self.connection._ssh.sendline = mock.Mock()
|
||||
self.connection._ssh.sendline.side_effect = exceptions.EOF('except')
|
||||
self.connection.run_command('test')
|
||||
self.assertFalse(self.connection._connected)
|
||||
self.assertIsNone(self.connection._ssh)
|
||||
|
||||
def test_run_command_exception_pxssh(self):
|
||||
"""Testing exception in run_command."""
|
||||
from pexpect import pxssh
|
||||
self.connection._ssh = mock.Mock()
|
||||
self.connection._ssh.sendline = mock.Mock()
|
||||
self.connection._ssh.sendline.side_effect = pxssh.ExceptionPxssh(
|
||||
'except')
|
||||
self.connection.run_command('test')
|
||||
self.assertFalse(self.connection._connected)
|
||||
self.assertIsNone(self.connection._ssh)
|
||||
|
||||
def test_run_command_assertion_error(self):
|
||||
"""Testing exception in run_command."""
|
||||
self.connection._ssh = mock.Mock()
|
||||
self.connection._ssh.sendline = mock.Mock()
|
||||
self.connection._ssh.sendline.side_effect = AssertionError('except')
|
||||
self.connection.run_command('test')
|
||||
self.assertFalse(self.connection._connected)
|
||||
self.assertIsNone(self.connection._ssh)
|
||||
|
||||
|
||||
class TestTelnetConnection(unittest.TestCase):
|
||||
"""Testing TelnetConnection."""
|
||||
|
||||
def setUp(self):
|
||||
"""Setup test env."""
|
||||
self.connection = TelnetConnection(
|
||||
'fake', 'fake', 'fake', 'fake', 'fake')
|
||||
self.connection._connected = True
|
||||
|
||||
def test_run_command_exception_eof(self):
|
||||
"""Testing EOFException in run_command."""
|
||||
self.connection._telnet = mock.Mock()
|
||||
self.connection._telnet.write = mock.Mock()
|
||||
self.connection._telnet.write.side_effect = EOFError('except')
|
||||
self.connection.run_command('test')
|
||||
self.assertFalse(self.connection._connected)
|
||||
|
||||
def test_run_command_exception_connection_refused(self):
|
||||
"""Testing ConnectionRefusedError in run_command."""
|
||||
self.connection._telnet = mock.Mock()
|
||||
self.connection._telnet.write = mock.Mock()
|
||||
self.connection._telnet.write.side_effect = ConnectionRefusedError(
|
||||
'except')
|
||||
self.connection.run_command('test')
|
||||
self.assertFalse(self.connection._connected)
|
||||
|
||||
def test_run_command_exception_gaierror(self):
|
||||
"""Testing socket.gaierror in run_command."""
|
||||
self.connection._telnet = mock.Mock()
|
||||
self.connection._telnet.write = mock.Mock()
|
||||
self.connection._telnet.write.side_effect = socket.gaierror('except')
|
||||
self.connection.run_command('test')
|
||||
self.assertFalse(self.connection._connected)
|
||||
|
||||
def test_run_command_exception_oserror(self):
|
||||
"""Testing OSError in run_command."""
|
||||
self.connection._telnet = mock.Mock()
|
||||
self.connection._telnet.write = mock.Mock()
|
||||
self.connection._telnet.write.side_effect = OSError('except')
|
||||
self.connection.run_command('test')
|
||||
self.assertFalse(self.connection._connected)
|
||||
|
|
Loading…
Add table
Reference in a new issue