Validate unique_id in entity registry (#114648)

Co-authored-by: Shay Levy <levyshay1@gmail.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Erik Montnemery 2024-04-04 21:51:44 +02:00 committed by GitHub
parent 1c2499b03a
commit 7c95ecff20
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 244 additions and 33 deletions

View file

@ -10,7 +10,7 @@ timer.
from __future__ import annotations
from collections.abc import Callable, Iterable, KeysView, Mapping
from collections.abc import Callable, Hashable, Iterable, KeysView, Mapping
from datetime import datetime, timedelta
from enum import StrEnum
from functools import cached_property
@ -45,6 +45,7 @@ from homeassistant.core import (
valid_entity_id,
)
from homeassistant.exceptions import MaxLengthExceeded
from homeassistant.loader import async_suggest_report_issue
from homeassistant.util import slugify, uuid as uuid_util
from homeassistant.util.json import format_unserializable_data
from homeassistant.util.read_only_dict import ReadOnlyDict
@ -606,6 +607,56 @@ class EntityRegistryItems(BaseRegistryItems[RegistryEntry]):
return [data[key] for key in self._labels_index.get(label, ())]
def _validate_item(
hass: HomeAssistant,
domain: str,
platform: str,
unique_id: str | Hashable | UndefinedType | Any,
*,
disabled_by: RegistryEntryDisabler | None | UndefinedType = None,
entity_category: EntityCategory | None | UndefinedType = None,
hidden_by: RegistryEntryHider | None | UndefinedType = None,
) -> None:
"""Validate entity registry item."""
if unique_id is not UNDEFINED and not isinstance(unique_id, Hashable):
raise TypeError(f"unique_id must be a string, got {unique_id}")
if unique_id is not UNDEFINED and not isinstance(unique_id, str):
# In HA Core 2025.4, we should fail if unique_id is not a string
report_issue = async_suggest_report_issue(hass, integration_domain=platform)
_LOGGER.error(
("'%s' from integration %s has a non string unique_id" " '%s', please %s"),
domain,
platform,
unique_id,
report_issue,
)
return
if (
disabled_by
and disabled_by is not UNDEFINED
and not isinstance(disabled_by, RegistryEntryDisabler)
):
raise ValueError(
f"disabled_by must be a RegistryEntryDisabler value, got {disabled_by}"
)
if (
entity_category
and entity_category is not UNDEFINED
and not isinstance(entity_category, EntityCategory)
):
raise ValueError(
f"entity_category must be a valid EntityCategory instance, got {entity_category}"
)
if (
hidden_by
and hidden_by is not UNDEFINED
and not isinstance(hidden_by, RegistryEntryHider)
):
raise ValueError(
f"hidden_by must be a RegistryEntryHider value, got {hidden_by}"
)
class EntityRegistry(BaseRegistry):
"""Class to hold a registry of entities."""
@ -764,6 +815,16 @@ class EntityRegistry(BaseRegistry):
unit_of_measurement=unit_of_measurement,
)
_validate_item(
self.hass,
domain,
platform,
disabled_by=disabled_by,
entity_category=entity_category,
hidden_by=hidden_by,
unique_id=unique_id,
)
entity_registry_id: str | None = None
deleted_entity = self.deleted_entities.pop((domain, platform, unique_id), None)
if deleted_entity is not None:
@ -776,11 +837,6 @@ class EntityRegistry(BaseRegistry):
known_object_ids,
)
if disabled_by and not isinstance(disabled_by, RegistryEntryDisabler):
raise ValueError("disabled_by must be a RegistryEntryDisabler value")
if hidden_by and not isinstance(hidden_by, RegistryEntryHider):
raise ValueError("hidden_by must be a RegistryEntryHider value")
if (
disabled_by is None
and config_entry
@ -789,13 +845,6 @@ class EntityRegistry(BaseRegistry):
):
disabled_by = RegistryEntryDisabler.INTEGRATION
if (
entity_category
and entity_category is not UNDEFINED
and not isinstance(entity_category, EntityCategory)
):
raise ValueError("entity_category must be a valid EntityCategory instance")
def none_if_undefined(value: T | UndefinedType) -> T | None:
"""Return None if value is UNDEFINED, otherwise return value."""
return None if value is UNDEFINED else value
@ -954,26 +1003,6 @@ class EntityRegistry(BaseRegistry):
new_values: dict[str, Any] = {} # Dict with new key/value pairs
old_values: dict[str, Any] = {} # Dict with old key/value pairs
if (
disabled_by
and disabled_by is not UNDEFINED
and not isinstance(disabled_by, RegistryEntryDisabler)
):
raise ValueError("disabled_by must be a RegistryEntryDisabler value")
if (
hidden_by
and hidden_by is not UNDEFINED
and not isinstance(hidden_by, RegistryEntryHider)
):
raise ValueError("hidden_by must be a RegistryEntryHider value")
if (
entity_category
and entity_category is not UNDEFINED
and not isinstance(entity_category, EntityCategory)
):
raise ValueError("entity_category must be a valid EntityCategory instance")
for attr_name, value in (
("aliases", aliases),
("area_id", area_id),
@ -1002,6 +1031,18 @@ class EntityRegistry(BaseRegistry):
new_values[attr_name] = value
old_values[attr_name] = getattr(old, attr_name)
# Only validate if data has changed
if new_values or new_unique_id is not UNDEFINED:
_validate_item(
self.hass,
old.domain,
old.platform,
disabled_by=disabled_by,
entity_category=entity_category,
hidden_by=hidden_by,
unique_id=new_unique_id,
)
if new_entity_id is not UNDEFINED and new_entity_id != old.entity_id:
if not self._entity_id_available(new_entity_id, None):
raise ValueError("Entity with this ID is already registered")
@ -1170,6 +1211,27 @@ class EntityRegistry(BaseRegistry):
if entity["entity_category"] == "system":
entity["entity_category"] = None
try:
domain = split_entity_id(entity["entity_id"])[0]
_validate_item(
self.hass, domain, entity["platform"], entity["unique_id"]
)
except (TypeError, ValueError) as err:
report_issue = async_suggest_report_issue(
self.hass, integration_domain=entity["platform"]
)
_LOGGER.error(
(
"Entity registry entry '%s' from integration %s could not "
"be loaded: '%s', please %s"
),
entity["entity_id"],
entity["platform"],
str(err),
report_issue,
)
continue
entities[entity["entity_id"]] = RegistryEntry(
aliases=set(entity["aliases"]),
area_id=entity["area_id"],
@ -1205,6 +1267,13 @@ class EntityRegistry(BaseRegistry):
unit_of_measurement=entity["unit_of_measurement"],
)
for entity in data["deleted_entities"]:
try:
domain = split_entity_id(entity["entity_id"])[0]
_validate_item(
self.hass, domain, entity["platform"], entity["unique_id"]
)
except (TypeError, ValueError):
continue
key = (
split_entity_id(entity["entity_id"])[0],
entity["platform"],

View file

@ -447,6 +447,116 @@ async def test_filter_on_load(
assert entry_system_category.entity_category is None
@pytest.mark.parametrize("load_registries", [False])
async def test_load_bad_data(
hass: HomeAssistant,
hass_storage: dict[str, Any],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test loading invalid data."""
hass_storage[er.STORAGE_KEY] = {
"version": er.STORAGE_VERSION_MAJOR,
"minor_version": er.STORAGE_VERSION_MINOR,
"data": {
"entities": [
{
"aliases": [],
"area_id": None,
"capabilities": None,
"categories": {},
"config_entry_id": None,
"device_class": None,
"device_id": None,
"disabled_by": None,
"entity_category": None,
"entity_id": "test.test1",
"has_entity_name": False,
"hidden_by": None,
"icon": None,
"id": "00001",
"labels": [],
"name": None,
"options": None,
"original_device_class": None,
"original_icon": None,
"original_name": None,
"platform": "super_platform",
"previous_unique_id": None,
"supported_features": 0,
"translation_key": None,
"unique_id": 123, # Should trigger warning
"unit_of_measurement": None,
},
{
"aliases": [],
"area_id": None,
"capabilities": None,
"categories": {},
"config_entry_id": None,
"device_class": None,
"device_id": None,
"disabled_by": None,
"entity_category": None,
"entity_id": "test.test2",
"has_entity_name": False,
"hidden_by": None,
"icon": None,
"id": "00002",
"labels": [],
"name": None,
"options": None,
"original_device_class": None,
"original_icon": None,
"original_name": None,
"platform": "super_platform",
"previous_unique_id": None,
"supported_features": 0,
"translation_key": None,
"unique_id": ["not", "valid"], # Should not load
"unit_of_measurement": None,
},
],
"deleted_entities": [
{
"config_entry_id": None,
"entity_id": "test.test3",
"id": "00003",
"orphaned_timestamp": None,
"platform": "super_platform",
"unique_id": 234, # Should trigger warning
},
{
"config_entry_id": None,
"entity_id": "test.test4",
"id": "00004",
"orphaned_timestamp": None,
"platform": "super_platform",
"unique_id": ["also", "not", "valid"], # Should not load
},
],
},
}
await er.async_load(hass)
registry = er.async_get(hass)
assert len(registry.entities) == 1
assert set(registry.entities.keys()) == {"test.test1"}
assert len(registry.deleted_entities) == 1
assert set(registry.deleted_entities.keys()) == {("test", "super_platform", 234)}
assert (
"'test' from integration super_platform has a non string unique_id '123', "
"please create a bug report" in caplog.text
)
assert (
"Entity registry entry 'test.test2' from integration super_platform could not "
"be loaded: 'unique_id must be a string, got ['not', 'valid']', please create "
"a bug report" in caplog.text
)
def test_async_get_entity_id(entity_registry: er.EntityRegistry) -> None:
"""Test that entity_id is returned."""
entry = entity_registry.async_get_or_create("light", "hue", "1234")
@ -1472,6 +1582,38 @@ async def test_hidden_by_str_not_allowed(entity_registry: er.EntityRegistry) ->
)
async def test_unique_id_non_hashable(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> None:
"""Test unique_id which is not hashable."""
with pytest.raises(TypeError):
entity_registry.async_get_or_create("light", "hue", ["not", "valid"])
entity_id = entity_registry.async_get_or_create("light", "hue", "1234").entity_id
with pytest.raises(TypeError):
entity_registry.async_update_entity(entity_id, new_unique_id=["not", "valid"])
async def test_unique_id_non_string(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test unique_id which is not a string."""
entity_registry.async_get_or_create("light", "hue", 1234)
assert (
"'light' from integration hue has a non string unique_id '1234', "
"please create a bug report" in caplog.text
)
entity_id = entity_registry.async_get_or_create("light", "hue", "1234").entity_id
entity_registry.async_update_entity(entity_id, new_unique_id=2345)
assert (
"'light' from integration hue has a non string unique_id '2345', "
"please create a bug report" in caplog.text
)
def test_migrate_entity_to_new_platform(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> None: