Try multiple methods of getting data in asuswrt. (#11140)

* Try multiple methods of getting data in asuswrt.

Solves #11108 and potentially #8112.

* fix style

* fix lint
This commit is contained in:
Przemek Więch 2017-12-17 12:46:47 +01:00 committed by Pascal Vizeli
parent 3375261f51
commit 024f1d4882
2 changed files with 99 additions and 97 deletions

View file

@ -67,6 +67,15 @@ _IP_NEIGH_REGEX = re.compile(
r'\s?(router)?' r'\s?(router)?'
r'(?P<status>(\w+))') r'(?P<status>(\w+))')
_ARP_CMD = 'arp -n'
_ARP_REGEX = re.compile(
r'.+\s' +
r'\((?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\)\s' +
r'.+\s' +
r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))' +
r'\s' +
r'.*')
# pylint: disable=unused-argument # pylint: disable=unused-argument
def get_scanner(hass, config): def get_scanner(hass, config):
@ -76,7 +85,22 @@ def get_scanner(hass, config):
return scanner if scanner.success_init else None return scanner if scanner.success_init else None
AsusWrtResult = namedtuple('AsusWrtResult', 'neighbors leases') def _parse_lines(lines, regex):
"""Parse the lines using the given regular expression.
If a line can't be parsed it is logged and skipped in the output.
"""
results = []
for line in lines:
match = regex.search(line)
if not match:
_LOGGER.debug("Could not parse row: %s", line)
continue
results.append(match.groupdict())
return results
Device = namedtuple('Device', ['mac', 'ip', 'name'])
class AsusWrtDeviceScanner(DeviceScanner): class AsusWrtDeviceScanner(DeviceScanner):
@ -121,16 +145,13 @@ class AsusWrtDeviceScanner(DeviceScanner):
def scan_devices(self): def scan_devices(self):
"""Scan for new devices and return a list with found device IDs.""" """Scan for new devices and return a list with found device IDs."""
self._update_info() self._update_info()
return [client['mac'] for client in self.last_results] return list(self.last_results.keys())
def get_device_name(self, device): def get_device_name(self, device):
"""Return the name of the given device or None if we don't know.""" """Return the name of the given device or None if we don't know."""
if not self.last_results: if device not in self.last_results:
return None return None
for client in self.last_results: return self.last_results[device].name
if client['mac'] == device:
return client['host']
return None
def _update_info(self): def _update_info(self):
"""Ensure the information from the ASUSWRT router is up to date. """Ensure the information from the ASUSWRT router is up to date.
@ -145,72 +166,71 @@ class AsusWrtDeviceScanner(DeviceScanner):
if not data: if not data:
return False return False
active_clients = [client for client in data.values() if self.last_results = data
client['status'] == 'REACHABLE' or
client['status'] == 'DELAY' or
client['status'] == 'STALE' or
client['status'] == 'IN_ASSOCLIST']
self.last_results = active_clients
return True return True
def get_asuswrt_data(self): def get_asuswrt_data(self):
"""Retrieve data from ASUSWRT and return parsed result.""" """Retrieve data from ASUSWRT.
result = self.connection.get_result()
if not result:
return {}
Calls various commands on the router and returns the superset of all
responses. Some commands will not work on some routers.
"""
devices = {} devices = {}
if self.mode == 'ap': devices.update(self._get_wl())
for lease in result.leases: devices.update(self._get_arp())
match = _WL_REGEX.search(lease.decode('utf-8')) devices.update(self._get_neigh())
if not self.mode == 'ap':
devices.update(self._get_leases())
return devices
if not match: def _get_wl(self):
_LOGGER.warning("Could not parse wl row: %s", lease) lines = self.connection.run_command(_WL_CMD)
continue if not lines:
return {}
result = _parse_lines(lines, _WL_REGEX)
devices = {}
for device in result:
mac = device['mac'].upper()
devices[mac] = Device(mac, None, None)
return devices
def _get_leases(self):
lines = self.connection.run_command(_LEASES_CMD)
if not lines:
return {}
lines = [line for line in lines if not line.startswith('duid ')]
result = _parse_lines(lines, _LEASES_REGEX)
devices = {}
for device in result:
# For leases where the client doesn't set a hostname, ensure it
# is blank and not '*', which breaks entity_id down the line.
host = device['host']
if host == '*':
host = '' host = ''
mac = device['mac'].upper()
devices[mac] = Device(mac, device['ip'], host)
return devices
devices[match.group('mac').upper()] = { def _get_neigh(self):
'host': host, lines = self.connection.run_command(_IP_NEIGH_CMD)
'status': 'IN_ASSOCLIST', if not lines:
'ip': '', return {}
'mac': match.group('mac').upper(), result = _parse_lines(lines, _IP_NEIGH_REGEX)
} devices = {}
for device in result:
else: mac = device['mac'].upper()
for lease in result.leases: devices[mac] = Device(mac, None, None)
if lease.startswith(b'duid '): return devices
continue
match = _LEASES_REGEX.search(lease.decode('utf-8'))
if not match:
_LOGGER.warning("Could not parse lease row: %s", lease)
continue
# For leases where the client doesn't set a hostname, ensure it
# is blank and not '*', which breaks entity_id down the line.
host = match.group('host')
if host == '*':
host = ''
devices[match.group('mac')] = {
'host': host,
'status': '',
'ip': match.group('ip'),
'mac': match.group('mac').upper(),
}
for neighbor in result.neighbors:
match = _IP_NEIGH_REGEX.search(neighbor.decode('utf-8'))
if not match:
_LOGGER.warning("Could not parse neighbor row: %s",
neighbor)
continue
if match.group('mac') in devices:
devices[match.group('mac')]['status'] = (
match.group('status'))
def _get_arp(self):
lines = self.connection.run_command(_ARP_CMD)
if not lines:
return {}
result = _parse_lines(lines, _ARP_REGEX)
devices = {}
for device in result:
mac = device['mac'].upper()
devices[mac] = Device(mac, device['ip'], None)
return devices return devices
@ -247,8 +267,8 @@ class SshConnection(_Connection):
self._ssh_key = ssh_key self._ssh_key = ssh_key
self._ap = ap self._ap = ap
def get_result(self): def run_command(self, command):
"""Retrieve a single AsusWrtResult through an SSH connection. """Run commands through an SSH connection.
Connect to the SSH server if not currently connected, otherwise Connect to the SSH server if not currently connected, otherwise
use the existing connection. use the existing connection.
@ -258,19 +278,10 @@ class SshConnection(_Connection):
try: try:
if not self.connected: if not self.connected:
self.connect() self.connect()
if self._ap: self._ssh.sendline(command)
neighbors = [''] self._ssh.prompt()
self._ssh.sendline(_WL_CMD) lines = self._ssh.before.split(b'\n')[1:-1]
self._ssh.prompt() return [line.decode('utf-8') for line in lines]
leases_result = self._ssh.before.split(b'\n')[1:-1]
else:
self._ssh.sendline(_IP_NEIGH_CMD)
self._ssh.prompt()
neighbors = self._ssh.before.split(b'\n')[1:-1]
self._ssh.sendline(_LEASES_CMD)
self._ssh.prompt()
leases_result = self._ssh.before.split(b'\n')[1:-1]
return AsusWrtResult(neighbors, leases_result)
except exceptions.EOF as err: except exceptions.EOF as err:
_LOGGER.error("Connection refused. SSH enabled?") _LOGGER.error("Connection refused. SSH enabled?")
self.disconnect() self.disconnect()
@ -326,8 +337,8 @@ class TelnetConnection(_Connection):
self._ap = ap self._ap = ap
self._prompt_string = None self._prompt_string = None
def get_result(self): def run_command(self, command):
"""Retrieve a single AsusWrtResult through a Telnet connection. """Run a command through a Telnet connection.
Connect to the Telnet server if not currently connected, otherwise Connect to the Telnet server if not currently connected, otherwise
use the existing connection. use the existing connection.
@ -336,18 +347,9 @@ class TelnetConnection(_Connection):
if not self.connected: if not self.connected:
self.connect() self.connect()
self._telnet.write('{}\n'.format(_IP_NEIGH_CMD).encode('ascii')) self._telnet.write('{}\n'.format(command).encode('ascii'))
neighbors = (self._telnet.read_until(self._prompt_string). return (self._telnet.read_until(self._prompt_string).
split(b'\n')[1:-1]) split(b'\n')[1:-1])
if self._ap:
self._telnet.write('{}\n'.format(_WL_CMD).encode('ascii'))
leases_result = (self._telnet.read_until(self._prompt_string).
split(b'\n')[1:-1])
else:
self._telnet.write('{}\n'.format(_LEASES_CMD).encode('ascii'))
leases_result = (self._telnet.read_until(self._prompt_string).
split(b'\n')[1:-1])
return AsusWrtResult(neighbors, leases_result)
except EOFError: except EOFError:
_LOGGER.error("Unexpected response from router") _LOGGER.error("Unexpected response from router")
self.disconnect() self.disconnect()

View file

@ -144,7 +144,7 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
update_mock.start() update_mock.start()
self.addCleanup(update_mock.stop) self.addCleanup(update_mock.stop)
asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict) asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict)
asuswrt.connection.get_result() asuswrt.connection.run_command('ls')
self.assertEqual(ssh.login.call_count, 1) self.assertEqual(ssh.login.call_count, 1)
self.assertEqual( self.assertEqual(
ssh.login.call_args, ssh.login.call_args,
@ -170,7 +170,7 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
update_mock.start() update_mock.start()
self.addCleanup(update_mock.stop) self.addCleanup(update_mock.stop)
asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict) asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict)
asuswrt.connection.get_result() asuswrt.connection.run_command('ls')
self.assertEqual(ssh.login.call_count, 1) self.assertEqual(ssh.login.call_count, 1)
self.assertEqual( self.assertEqual(
ssh.login.call_args, ssh.login.call_args,
@ -225,9 +225,9 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
update_mock.start() update_mock.start()
self.addCleanup(update_mock.stop) self.addCleanup(update_mock.stop)
asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict) asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict)
asuswrt.connection.get_result() asuswrt.connection.run_command('ls')
self.assertEqual(telnet.read_until.call_count, 5) self.assertEqual(telnet.read_until.call_count, 4)
self.assertEqual(telnet.write.call_count, 4) self.assertEqual(telnet.write.call_count, 3)
self.assertEqual( self.assertEqual(
telnet.read_until.call_args_list[0], telnet.read_until.call_args_list[0],
mock.call(b'login: ') mock.call(b'login: ')