diff --git a/homeassistant/components/dhcp/__init__.py b/homeassistant/components/dhcp/__init__.py index d184ec40383..e21b6cf88dc 100644 --- a/homeassistant/components/dhcp/__init__.py +++ b/homeassistant/components/dhcp/__init__.py @@ -18,6 +18,7 @@ from scapy.arch.common import compile_filter from scapy.config import conf from scapy.error import Scapy_Exception from scapy.layers.dhcp import DHCP +from scapy.layers.inet import IP from scapy.layers.l2 import Ether from scapy.sendrecv import AsyncSniffer @@ -41,7 +42,7 @@ from homeassistant.helpers.event import ( async_track_time_interval, ) from homeassistant.loader import async_get_dhcp -from homeassistant.util.network import is_link_local +from homeassistant.util.network import is_invalid, is_link_local, is_loopback from .const import DOMAIN @@ -93,8 +94,14 @@ class WatcherBase: def process_client(self, ip_address, hostname, mac_address): """Process a client.""" - if is_link_local(make_ip_address(ip_address)): - # Ignore self assigned addresses + made_ip_address = make_ip_address(ip_address) + + if ( + is_link_local(made_ip_address) + or is_loopback(made_ip_address) + or is_invalid(made_ip_address) + ): + # Ignore self assigned addresses, loopback, invalid return data = self._address_data.get(ip_address) @@ -318,7 +325,7 @@ class DHCPWatcher(WatcherBase): # DHCP request return - ip_address = _decode_dhcp_option(options, REQUESTED_ADDR) + ip_address = _decode_dhcp_option(options, REQUESTED_ADDR) or packet[IP].src hostname = _decode_dhcp_option(options, HOSTNAME) mac_address = _format_mac(packet[Ether].src) diff --git a/homeassistant/util/network.py b/homeassistant/util/network.py index c36e7f3793a..e714b6b6b31 100644 --- a/homeassistant/util/network.py +++ b/homeassistant/util/network.py @@ -44,6 +44,11 @@ def is_local(address: IPv4Address | IPv6Address) -> bool: return is_loopback(address) or is_private(address) +def is_invalid(address: IPv4Address | IPv6Address) -> bool: + """Check if an address is invalid.""" + return bool(address == ip_address("0.0.0.0")) + + def is_ip_address(address: str) -> bool: """Check if a given string is an IP address.""" try: diff --git a/tests/components/dhcp/test_init.py b/tests/components/dhcp/test_init.py index 69e15104092..25fbbea459a 100644 --- a/tests/components/dhcp/test_init.py +++ b/tests/components/dhcp/test_init.py @@ -50,6 +50,36 @@ RAW_DHCP_REQUEST = ( b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" ) +# iRobot-AE9EC12DD3B04885BCBFA36AFB01E1CC 50:14:79:03:85:2c 192.168.1.120 +RAW_DHCP_RENEWAL = ( + b"\x00\x15\x5d\x8e\xed\x02\x50\x14\x79\x03\x85\x2c\x08\x00\x45\x00" + b"\x01\x8e\x51\xd2\x40\x00\x40\x11\x63\xa1\xc0\xa8\x01\x78\xc0\xa8" + b"\x01\x23\x00\x44\x00\x43\x01\x7a\x12\x09\x01\x01\x06\x00\xd4\xea" + b"\xb2\xfd\xff\xff\x00\x00\xc0\xa8\x01\x78\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x50\x14\x79\x03\x85\x2c\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x63\x82\x53\x63\x35\x01\x03\x39\x02\x05" + b"\xdc\x3c\x45\x64\x68\x63\x70\x63\x64\x2d\x35\x2e\x32\x2e\x31\x30" + b"\x3a\x4c\x69\x6e\x75\x78\x2d\x33\x2e\x31\x38\x2e\x37\x31\x3a\x61" + b"\x72\x6d\x76\x37\x6c\x3a\x51\x75\x61\x6c\x63\x6f\x6d\x6d\x20\x54" + b"\x65\x63\x68\x6e\x6f\x6c\x6f\x67\x69\x65\x73\x2c\x20\x49\x6e\x63" + b"\x20\x41\x50\x51\x38\x30\x30\x39\x0c\x27\x69\x52\x6f\x62\x6f\x74" + b"\x2d\x41\x45\x39\x45\x43\x31\x32\x44\x44\x33\x42\x30\x34\x38\x38" + b"\x35\x42\x43\x42\x46\x41\x33\x36\x41\x46\x42\x30\x31\x45\x31\x43" + b"\x43\x37\x08\x01\x21\x03\x06\x1c\x33\x3a\x3b\xff" +) + async def test_dhcp_match_hostname_and_macaddress(hass): """Test matching based on hostname and macaddress.""" @@ -76,6 +106,31 @@ async def test_dhcp_match_hostname_and_macaddress(hass): } +async def test_dhcp_renewal_match_hostname_and_macaddress(hass): + """Test renewal matching based on hostname and macaddress.""" + dhcp_watcher = dhcp.DHCPWatcher( + hass, + {}, + [{"domain": "mock-domain", "hostname": "irobot-*", "macaddress": "501479*"}], + ) + + packet = Ether(RAW_DHCP_RENEWAL) + + with patch.object(hass.config_entries.flow, "async_init") as mock_init: + dhcp_watcher.handle_dhcp_packet(packet) + # Ensure no change is ignored + dhcp_watcher.handle_dhcp_packet(packet) + + assert len(mock_init.mock_calls) == 1 + assert mock_init.mock_calls[0][1][0] == "mock-domain" + assert mock_init.mock_calls[0][2]["context"] == {"source": "dhcp"} + assert mock_init.mock_calls[0][2]["data"] == { + dhcp.IP_ADDRESS: "192.168.1.120", + dhcp.HOSTNAME: "irobot-ae9ec12dd3b04885bcbfa36afb01e1cc", + dhcp.MAC_ADDRESS: "50147903852c", + } + + async def test_dhcp_match_hostname(hass): """Test matching based on hostname only.""" dhcp_watcher = dhcp.DHCPWatcher( diff --git a/tests/util/test_network.py b/tests/util/test_network.py index 2cd710e1d6c..089ef5e0ab8 100644 --- a/tests/util/test_network.py +++ b/tests/util/test_network.py @@ -33,6 +33,12 @@ def test_is_link_local(): assert not network_util.is_link_local(ip_address("127.0.0.1")) +def test_is_invalid(): + """Test invalid address.""" + assert network_util.is_invalid(ip_address("0.0.0.0")) + assert not network_util.is_invalid(ip_address("127.0.0.1")) + + def test_is_local(): """Test local addresses.""" assert network_util.is_local(ip_address("192.168.0.1"))