hass-core/homeassistant/components/device_tracker/asuswrt.py
Jeff Schroeder 7c7b6ca05c Fix the asuswrt device tracker for dhcp leases with no hostname
Sometimes, hosts request dhcp leases without sending the hostname
they want to the dhcp server. This results in the entity_id being
`device_tracker.` as the dev_id is empty and things go downhill
from there.

The dhcp lease file looks like:
    admin@RT-AC66R:/tmp/home/root# cat /var/lib/misc/dnsmasq.leases
    86400 5c:c5:d4:79:4c:ad 192.168.1.226 chit-jsl3 *
    85242 8c:77:12:ad:d9:23 192.168.1.126 android-2c94abebaab16255 01:8c:77:12:ad:d9:23
    61985 b8:e9:37:73:47:f0 192.168.1.204 * 01:b8:e9:37:73:47:f0
    61982 b8:e9:37:ec:0d:7e 192.168.1.132 * 01:b8:e9:37:ec:0d:7e
    84584 00:20:6b:ca:31:c1 192.168.1.182 MC4650-CA31C1 01:00:20:6b:ca:31:c1
    86306 fc:e9:98:d6:4b:90 192.168.1.173 iLol 01:fc:e9:98:d6:4b:90
    74343 20:3a:07:f3:7e:ae 192.168.1.246 gatekeeper 01:20:3a:07:f3:7e:ae
    72374 b8:e9:37:5f:3d:06 192.168.1.34 SonosZP 01:b8:e9:37:5f:3d:06
    64697 00:0e:58:6f:59:d2 192.168.1.171 SonosZB 01:00:0e:58:6f:59:d2

Confirmed working on an Asus RT-AC66R with fw version: 3.0.0.4.376_3861
2015-09-14 20:33:14 -05:00

179 lines
5.6 KiB
Python

"""
homeassistant.components.device_tracker.asuswrt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Device tracker platform that supports scanning a ASUSWRT router for device
presence.
This device tracker needs telnet to be enabled on the router.
Configuration:
To use the ASUSWRT tracker you will need to add something like the following
to your configuration.yaml file.
device_tracker:
platform: asuswrt
host: YOUR_ROUTER_IP
username: YOUR_ADMIN_USERNAME
password: YOUR_ADMIN_PASSWORD
Variables:
host
*Required
The IP address of your router, e.g. 192.168.1.1.
username
*Required
The username of an user with administrative privileges, usually 'admin'.
password
*Required
The password for your given admin account.
"""
import logging
from datetime import timedelta
import re
import threading
import telnetlib
from homeassistant.const import CONF_HOST, CONF_USERNAME, CONF_PASSWORD
from homeassistant.helpers import validate_config
from homeassistant.util import Throttle
from homeassistant.components.device_tracker import DOMAIN
# Return cached results if last scan was less then this time ago
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
_LEASES_REGEX = re.compile(
r'\w+\s' +
r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s' +
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s' +
r'(?P<host>([^\s]+))')
_IP_NEIGH_REGEX = re.compile(
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s' +
r'\w+\s' +
r'\w+\s' +
r'(\w+\s(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2}))))?\s' +
r'(?P<status>(\w+))')
# pylint: disable=unused-argument
def get_scanner(hass, config):
""" Validates config and returns an ASUS-WRT scanner. """
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME, CONF_PASSWORD]},
_LOGGER):
return None
scanner = AsusWrtDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None
class AsusWrtDeviceScanner(object):
"""
This class queries a router running ASUSWRT firmware
for connected devices. Adapted from DD-WRT scanner.
"""
def __init__(self, config):
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.lock = threading.Lock()
self.last_results = {}
# Test the router is accessible
data = self.get_asuswrt_data()
self.success_init = data is not None
def scan_devices(self):
"""
Scans for new devices and return a list containing found device IDs.
"""
self._update_info()
return [client['mac'] for client in self.last_results]
def get_device_name(self, device):
""" Returns the name of the given device or None if we don't know. """
if not self.last_results:
return None
for client in self.last_results:
if client['mac'] == device:
return client['host']
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensures the information from the ASUSWRT router is up to date.
Returns boolean if scanning successful.
"""
if not self.success_init:
return False
with self.lock:
_LOGGER.info("Checking ARP")
data = self.get_asuswrt_data()
if not data:
return False
active_clients = [client for client in data.values() if
client['status'] == 'REACHABLE' or
client['status'] == 'DELAY' or
client['status'] == 'STALE']
self.last_results = active_clients
return True
def get_asuswrt_data(self):
""" Retrieve data from ASUSWRT and return parsed result. """
try:
telnet = telnetlib.Telnet(self.host)
telnet.read_until(b'login: ')
telnet.write((self.username + '\n').encode('ascii'))
telnet.read_until(b'Password: ')
telnet.write((self.password + '\n').encode('ascii'))
prompt_string = telnet.read_until(b'#').split(b'\n')[-1]
telnet.write('ip neigh\n'.encode('ascii'))
neighbors = telnet.read_until(prompt_string).split(b'\n')[1:-1]
telnet.write('cat /var/lib/misc/dnsmasq.leases\n'.encode('ascii'))
leases_result = telnet.read_until(prompt_string).split(b'\n')[1:-1]
telnet.write('exit\n'.encode('ascii'))
except EOFError:
_LOGGER.exception("Unexpected response from router")
return
except ConnectionRefusedError:
_LOGGER.exception("Connection refused by router," +
" is telnet enabled?")
return
devices = {}
for lease in leases_result:
match = _LEASES_REGEX.search(lease.decode('utf-8'))
# For leases where the client doesn't set a hostname, ensure
# it is blank and not '*', which breaks the entity_id down
# the line
host = match.group('host')
if host == '*':
host = ''
devices[match.group('ip')] = {
'host': host,
'status': '',
'ip': match.group('ip'),
'mac': match.group('mac').upper(),
}
for neighbor in neighbors:
match = _IP_NEIGH_REGEX.search(neighbor.decode('utf-8'))
if match.group('ip') in devices:
devices[match.group('ip')]['status'] = match.group('status')
return devices