Add support for capturing renewals to dhcp discovery (#48242)
This commit is contained in:
parent
b3b0904b94
commit
f91de1c8b9
4 changed files with 77 additions and 4 deletions
|
@ -18,6 +18,7 @@ from scapy.arch.common import compile_filter
|
|||
from scapy.config import conf
|
||||
from scapy.error import Scapy_Exception
|
||||
from scapy.layers.dhcp import DHCP
|
||||
from scapy.layers.inet import IP
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.sendrecv import AsyncSniffer
|
||||
|
||||
|
@ -41,7 +42,7 @@ from homeassistant.helpers.event import (
|
|||
async_track_time_interval,
|
||||
)
|
||||
from homeassistant.loader import async_get_dhcp
|
||||
from homeassistant.util.network import is_link_local
|
||||
from homeassistant.util.network import is_invalid, is_link_local, is_loopback
|
||||
|
||||
from .const import DOMAIN
|
||||
|
||||
|
@ -93,8 +94,14 @@ class WatcherBase:
|
|||
|
||||
def process_client(self, ip_address, hostname, mac_address):
|
||||
"""Process a client."""
|
||||
if is_link_local(make_ip_address(ip_address)):
|
||||
# Ignore self assigned addresses
|
||||
made_ip_address = make_ip_address(ip_address)
|
||||
|
||||
if (
|
||||
is_link_local(made_ip_address)
|
||||
or is_loopback(made_ip_address)
|
||||
or is_invalid(made_ip_address)
|
||||
):
|
||||
# Ignore self assigned addresses, loopback, invalid
|
||||
return
|
||||
|
||||
data = self._address_data.get(ip_address)
|
||||
|
@ -318,7 +325,7 @@ class DHCPWatcher(WatcherBase):
|
|||
# DHCP request
|
||||
return
|
||||
|
||||
ip_address = _decode_dhcp_option(options, REQUESTED_ADDR)
|
||||
ip_address = _decode_dhcp_option(options, REQUESTED_ADDR) or packet[IP].src
|
||||
hostname = _decode_dhcp_option(options, HOSTNAME)
|
||||
mac_address = _format_mac(packet[Ether].src)
|
||||
|
||||
|
|
|
@ -44,6 +44,11 @@ def is_local(address: IPv4Address | IPv6Address) -> bool:
|
|||
return is_loopback(address) or is_private(address)
|
||||
|
||||
|
||||
def is_invalid(address: IPv4Address | IPv6Address) -> bool:
|
||||
"""Check if an address is invalid."""
|
||||
return bool(address == ip_address("0.0.0.0"))
|
||||
|
||||
|
||||
def is_ip_address(address: str) -> bool:
|
||||
"""Check if a given string is an IP address."""
|
||||
try:
|
||||
|
|
|
@ -50,6 +50,36 @@ RAW_DHCP_REQUEST = (
|
|||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
)
|
||||
|
||||
# iRobot-AE9EC12DD3B04885BCBFA36AFB01E1CC 50:14:79:03:85:2c 192.168.1.120
|
||||
RAW_DHCP_RENEWAL = (
|
||||
b"\x00\x15\x5d\x8e\xed\x02\x50\x14\x79\x03\x85\x2c\x08\x00\x45\x00"
|
||||
b"\x01\x8e\x51\xd2\x40\x00\x40\x11\x63\xa1\xc0\xa8\x01\x78\xc0\xa8"
|
||||
b"\x01\x23\x00\x44\x00\x43\x01\x7a\x12\x09\x01\x01\x06\x00\xd4\xea"
|
||||
b"\xb2\xfd\xff\xff\x00\x00\xc0\xa8\x01\x78\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x50\x14\x79\x03\x85\x2c\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x00\x00\x00\x00\x00\x00\x63\x82\x53\x63\x35\x01\x03\x39\x02\x05"
|
||||
b"\xdc\x3c\x45\x64\x68\x63\x70\x63\x64\x2d\x35\x2e\x32\x2e\x31\x30"
|
||||
b"\x3a\x4c\x69\x6e\x75\x78\x2d\x33\x2e\x31\x38\x2e\x37\x31\x3a\x61"
|
||||
b"\x72\x6d\x76\x37\x6c\x3a\x51\x75\x61\x6c\x63\x6f\x6d\x6d\x20\x54"
|
||||
b"\x65\x63\x68\x6e\x6f\x6c\x6f\x67\x69\x65\x73\x2c\x20\x49\x6e\x63"
|
||||
b"\x20\x41\x50\x51\x38\x30\x30\x39\x0c\x27\x69\x52\x6f\x62\x6f\x74"
|
||||
b"\x2d\x41\x45\x39\x45\x43\x31\x32\x44\x44\x33\x42\x30\x34\x38\x38"
|
||||
b"\x35\x42\x43\x42\x46\x41\x33\x36\x41\x46\x42\x30\x31\x45\x31\x43"
|
||||
b"\x43\x37\x08\x01\x21\x03\x06\x1c\x33\x3a\x3b\xff"
|
||||
)
|
||||
|
||||
|
||||
async def test_dhcp_match_hostname_and_macaddress(hass):
|
||||
"""Test matching based on hostname and macaddress."""
|
||||
|
@ -76,6 +106,31 @@ async def test_dhcp_match_hostname_and_macaddress(hass):
|
|||
}
|
||||
|
||||
|
||||
async def test_dhcp_renewal_match_hostname_and_macaddress(hass):
|
||||
"""Test renewal matching based on hostname and macaddress."""
|
||||
dhcp_watcher = dhcp.DHCPWatcher(
|
||||
hass,
|
||||
{},
|
||||
[{"domain": "mock-domain", "hostname": "irobot-*", "macaddress": "501479*"}],
|
||||
)
|
||||
|
||||
packet = Ether(RAW_DHCP_RENEWAL)
|
||||
|
||||
with patch.object(hass.config_entries.flow, "async_init") as mock_init:
|
||||
dhcp_watcher.handle_dhcp_packet(packet)
|
||||
# Ensure no change is ignored
|
||||
dhcp_watcher.handle_dhcp_packet(packet)
|
||||
|
||||
assert len(mock_init.mock_calls) == 1
|
||||
assert mock_init.mock_calls[0][1][0] == "mock-domain"
|
||||
assert mock_init.mock_calls[0][2]["context"] == {"source": "dhcp"}
|
||||
assert mock_init.mock_calls[0][2]["data"] == {
|
||||
dhcp.IP_ADDRESS: "192.168.1.120",
|
||||
dhcp.HOSTNAME: "irobot-ae9ec12dd3b04885bcbfa36afb01e1cc",
|
||||
dhcp.MAC_ADDRESS: "50147903852c",
|
||||
}
|
||||
|
||||
|
||||
async def test_dhcp_match_hostname(hass):
|
||||
"""Test matching based on hostname only."""
|
||||
dhcp_watcher = dhcp.DHCPWatcher(
|
||||
|
|
|
@ -33,6 +33,12 @@ def test_is_link_local():
|
|||
assert not network_util.is_link_local(ip_address("127.0.0.1"))
|
||||
|
||||
|
||||
def test_is_invalid():
|
||||
"""Test invalid address."""
|
||||
assert network_util.is_invalid(ip_address("0.0.0.0"))
|
||||
assert not network_util.is_invalid(ip_address("127.0.0.1"))
|
||||
|
||||
|
||||
def test_is_local():
|
||||
"""Test local addresses."""
|
||||
assert network_util.is_local(ip_address("192.168.0.1"))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue