Compare commits
4 commits
dev
...
frenck-202
Author | SHA1 | Date | |
---|---|---|---|
|
b41aa2ceb3 | ||
|
03d9f84220 | ||
|
a0943f893c | ||
|
8e37992707 |
10 changed files with 954 additions and 0 deletions
|
@ -29,6 +29,7 @@ from .helpers import (
|
|||
device_registry,
|
||||
entity,
|
||||
entity_registry,
|
||||
folder_registry,
|
||||
issue_registry,
|
||||
recorder,
|
||||
template,
|
||||
|
@ -244,6 +245,7 @@ async def load_registries(hass: core.HomeAssistant) -> None:
|
|||
area_registry.async_load(hass),
|
||||
device_registry.async_load(hass),
|
||||
entity_registry.async_load(hass),
|
||||
folder_registry.async_load(hass),
|
||||
issue_registry.async_load(hass),
|
||||
hass.async_add_executor_job(_cache_uname_processor),
|
||||
template.async_load_custom_templates(hass),
|
||||
|
|
|
@ -26,6 +26,7 @@ SECTIONS = (
|
|||
"core",
|
||||
"device_registry",
|
||||
"entity_registry",
|
||||
"folder_registry",
|
||||
"script",
|
||||
"scene",
|
||||
)
|
||||
|
|
127
homeassistant/components/config/folder_registry.py
Normal file
127
homeassistant/components/config/folder_registry.py
Normal file
|
@ -0,0 +1,127 @@
|
|||
"""Websocket API to interact with the folder registry."""
|
||||
from typing import Any
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import websocket_api
|
||||
from homeassistant.components.websocket_api.connection import ActiveConnection
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers.folder_registry import FolderEntry, async_get
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant) -> bool:
|
||||
"""Register the folder registry WS commands."""
|
||||
websocket_api.async_register_command(hass, websocket_list_folders)
|
||||
websocket_api.async_register_command(hass, websocket_create_folder)
|
||||
websocket_api.async_register_command(hass, websocket_delete_folder)
|
||||
websocket_api.async_register_command(hass, websocket_update_folder)
|
||||
return True
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "config/folder_registry/list",
|
||||
vol.Required("domain"): str,
|
||||
}
|
||||
)
|
||||
@callback
|
||||
def websocket_list_folders(
|
||||
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
|
||||
) -> None:
|
||||
"""Handle list folders command."""
|
||||
registry = async_get(hass)
|
||||
connection.send_result(
|
||||
msg["id"],
|
||||
[
|
||||
_entry_dict(entry)
|
||||
for entry in registry.async_list_folders(domain=msg["domain"])
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "config/folder_registry/create",
|
||||
vol.Required("domain"): str,
|
||||
vol.Required("name"): str,
|
||||
vol.Optional("icon"): vol.Any(str, None),
|
||||
}
|
||||
)
|
||||
@websocket_api.require_admin
|
||||
@callback
|
||||
def websocket_create_folder(
|
||||
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
|
||||
) -> None:
|
||||
"""Create folder command."""
|
||||
registry = async_get(hass)
|
||||
|
||||
try:
|
||||
entry = registry.async_create(
|
||||
domain=msg["domain"],
|
||||
name=msg["name"],
|
||||
icon=msg.get("icon"),
|
||||
)
|
||||
except ValueError as err:
|
||||
connection.send_error(msg["id"], "invalid_info", str(err))
|
||||
else:
|
||||
connection.send_result(msg["id"], _entry_dict(entry))
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "config/folder_registry/delete",
|
||||
vol.Required("folder_id"): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.require_admin
|
||||
@callback
|
||||
def websocket_delete_folder(
|
||||
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
|
||||
) -> None:
|
||||
"""Delete folder command."""
|
||||
registry = async_get(hass)
|
||||
|
||||
try:
|
||||
registry.async_delete(msg["folder_id"])
|
||||
except KeyError:
|
||||
connection.send_error(msg["id"], "invalid_info", "Folder ID doesn't exist")
|
||||
else:
|
||||
connection.send_message(websocket_api.result_message(msg["id"], "success"))
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "config/folder_registry/update",
|
||||
vol.Required("folder_id"): str,
|
||||
vol.Optional("icon"): vol.Any(str, None),
|
||||
vol.Optional("name"): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.require_admin
|
||||
@callback
|
||||
def websocket_update_folder(
|
||||
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
|
||||
) -> None:
|
||||
"""Handle update folder websocket command."""
|
||||
registry = async_get(hass)
|
||||
|
||||
data = dict(msg)
|
||||
data.pop("type")
|
||||
data.pop("id")
|
||||
|
||||
try:
|
||||
entry = registry.async_update(**data)
|
||||
except ValueError as err:
|
||||
connection.send_error(msg["id"], "invalid_info", str(err))
|
||||
else:
|
||||
connection.send_result(msg["id"], _entry_dict(entry))
|
||||
|
||||
|
||||
@callback
|
||||
def _entry_dict(entry: FolderEntry) -> dict[str, Any]:
|
||||
"""Convert entry to API format."""
|
||||
return {
|
||||
"folder_id": entry.folder_id,
|
||||
"icon": entry.icon,
|
||||
"name": entry.name,
|
||||
}
|
226
homeassistant/helpers/folder_registry.py
Normal file
226
homeassistant/helpers/folder_registry.py
Normal file
|
@ -0,0 +1,226 @@
|
|||
"""Folder registry provides a folder structure for other integration to leverage."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterable, MutableMapping
|
||||
import dataclasses
|
||||
from dataclasses import dataclass, field
|
||||
from typing import cast
|
||||
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
import homeassistant.util.uuid as uuid_util
|
||||
|
||||
from .typing import UNDEFINED, UndefinedType
|
||||
|
||||
DATA_REGISTRY = "folder_registry"
|
||||
EVENT_FOLDER_REGISTRY_UPDATED = "folder_registry_updated"
|
||||
STORAGE_KEY = "core.folder_registry"
|
||||
STORAGE_VERSION_MAJOR = 1
|
||||
SAVE_DELAY = 10
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class FolderEntry:
|
||||
"""Folder Registry Entry."""
|
||||
|
||||
domain: str
|
||||
|
||||
name: str
|
||||
normalized_name: str
|
||||
|
||||
folder_id: str = field(default_factory=uuid_util.random_uuid_hex)
|
||||
|
||||
# Meta
|
||||
icon: str | None = None
|
||||
|
||||
|
||||
class FolderRegistry:
|
||||
"""Class to hold a registry of folder."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant) -> None:
|
||||
"""Initialize the folder registry."""
|
||||
self.hass = hass
|
||||
self.folders: MutableMapping[str, FolderEntry] = {}
|
||||
self._store = hass.helpers.storage.Store(
|
||||
STORAGE_VERSION_MAJOR,
|
||||
STORAGE_KEY,
|
||||
atomic_writes=True,
|
||||
)
|
||||
self._folder_idx: dict[str, dict[str, FolderEntry]] = {}
|
||||
|
||||
@callback
|
||||
def async_get_folder(self, folder_id: str) -> FolderEntry | None:
|
||||
"""Get folder by id."""
|
||||
return self.folders.get(folder_id)
|
||||
|
||||
@callback
|
||||
def async_get_folder_by_name(self, domain: str, name: str) -> FolderEntry | None:
|
||||
"""Get folder by name."""
|
||||
if domain not in self._folder_idx:
|
||||
return None
|
||||
|
||||
normalized_name = normalize_name(name)
|
||||
if normalized_name not in self._folder_idx[domain]:
|
||||
return None
|
||||
|
||||
return self._folder_idx[domain][normalized_name]
|
||||
|
||||
@callback
|
||||
def async_list_folders(self, domain: str) -> Iterable[FolderEntry]:
|
||||
"""Get all folders."""
|
||||
if domain not in self._folder_idx:
|
||||
return []
|
||||
return self._folder_idx[domain].values()
|
||||
|
||||
@callback
|
||||
def async_get_or_create(self, domain: str, name: str) -> FolderEntry:
|
||||
"""Get or create an folder."""
|
||||
if folder := self.async_get_folder_by_name(domain, name):
|
||||
return folder
|
||||
return self.async_create(domain, name)
|
||||
|
||||
@callback
|
||||
def async_create(
|
||||
self,
|
||||
domain: str,
|
||||
name: str,
|
||||
*,
|
||||
icon: str | None = None,
|
||||
) -> FolderEntry:
|
||||
"""Create a new folder."""
|
||||
if folder_entry := self.async_get_folder_by_name(domain, name):
|
||||
raise ValueError(
|
||||
f"The name {name} ({folder_entry.normalized_name}) is already in use"
|
||||
)
|
||||
|
||||
folder = FolderEntry(
|
||||
domain=domain,
|
||||
icon=icon,
|
||||
name=name,
|
||||
normalized_name=normalize_name(name),
|
||||
)
|
||||
self.folders[folder.folder_id] = folder
|
||||
if domain not in self._folder_idx:
|
||||
self._folder_idx[domain] = {}
|
||||
self._folder_idx[domain][folder.normalized_name] = folder
|
||||
self.async_schedule_save()
|
||||
self.hass.bus.async_fire(
|
||||
EVENT_FOLDER_REGISTRY_UPDATED,
|
||||
{"action": "create", "domain": domain, "folder_id": folder.folder_id},
|
||||
)
|
||||
return folder
|
||||
|
||||
@callback
|
||||
def async_delete(self, folder_id: str) -> None:
|
||||
"""Delete a folder."""
|
||||
folder = self.folders[folder_id]
|
||||
|
||||
del self.folders[folder_id]
|
||||
del self._folder_idx[folder.domain][folder.normalized_name]
|
||||
|
||||
self.hass.bus.async_fire(
|
||||
EVENT_FOLDER_REGISTRY_UPDATED,
|
||||
{"action": "remove", "domain": folder.domain, "folder_id": folder_id},
|
||||
)
|
||||
|
||||
self.async_schedule_save()
|
||||
|
||||
@callback
|
||||
def async_update(
|
||||
self,
|
||||
folder_id: str,
|
||||
icon: str | None | UndefinedType = UNDEFINED,
|
||||
name: str | UndefinedType = UNDEFINED,
|
||||
) -> FolderEntry:
|
||||
"""Update name of a folder."""
|
||||
old = self.folders[folder_id]
|
||||
changes: dict[str, str | None] = {}
|
||||
|
||||
normalized_name = None
|
||||
if name is not UNDEFINED and name != old.name:
|
||||
normalized_name = normalize_name(name)
|
||||
if (
|
||||
normalized_name != old.normalized_name
|
||||
and self.async_get_folder_by_name(old.domain, name)
|
||||
):
|
||||
raise ValueError(
|
||||
f"The name {name} ({normalized_name}) is already in use"
|
||||
)
|
||||
|
||||
changes["name"] = name
|
||||
changes["normalized_name"] = normalized_name
|
||||
|
||||
if icon is not UNDEFINED and icon != old.icon:
|
||||
changes["icon"] = icon
|
||||
|
||||
if not changes:
|
||||
return old
|
||||
|
||||
new = self.folders[folder_id] = dataclasses.replace(old, **changes)
|
||||
if normalized_name is not None:
|
||||
self._folder_idx[new.domain][normalized_name] = self._folder_idx[
|
||||
old.domain
|
||||
].pop(old.normalized_name)
|
||||
|
||||
self.async_schedule_save()
|
||||
self.hass.bus.async_fire(
|
||||
EVENT_FOLDER_REGISTRY_UPDATED,
|
||||
{"action": "update", "domain": new.domain, "folder_id": folder_id},
|
||||
)
|
||||
|
||||
return new
|
||||
|
||||
async def async_load(self) -> None:
|
||||
"""Load the folder registry."""
|
||||
data = await self._store.async_load()
|
||||
if data is not None:
|
||||
for folder_data in data["folders"]:
|
||||
folder = FolderEntry(
|
||||
domain=folder_data["domain"],
|
||||
folder_id=folder_data["folder_id"],
|
||||
icon=folder_data["icon"],
|
||||
name=folder_data["name"],
|
||||
normalized_name=normalize_name(folder_data["name"]),
|
||||
)
|
||||
if folder.domain not in self._folder_idx:
|
||||
self._folder_idx[folder.domain] = {}
|
||||
self.folders[folder.folder_id] = folder
|
||||
self._folder_idx[folder.domain][folder.normalized_name] = folder
|
||||
|
||||
@callback
|
||||
def async_schedule_save(self) -> None:
|
||||
"""Schedule saving the folder registry."""
|
||||
self._store.async_delay_save(self._data_to_save, SAVE_DELAY)
|
||||
|
||||
@callback
|
||||
def _data_to_save(self) -> dict[str, list[dict[str, str | None]]]:
|
||||
"""Return data of folder registry to store in a file."""
|
||||
return {
|
||||
"folders": [
|
||||
{
|
||||
"domain": entry.domain,
|
||||
"icon": entry.icon,
|
||||
"folder_id": entry.folder_id,
|
||||
"name": entry.name,
|
||||
}
|
||||
for entry in self.folders.values()
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@callback
|
||||
def async_get(hass: HomeAssistant) -> FolderRegistry:
|
||||
"""Get folder registry."""
|
||||
return cast(FolderRegistry, hass.data[DATA_REGISTRY])
|
||||
|
||||
|
||||
async def async_load(hass: HomeAssistant) -> None:
|
||||
"""Load folder registry."""
|
||||
assert DATA_REGISTRY not in hass.data
|
||||
hass.data[DATA_REGISTRY] = FolderRegistry(hass)
|
||||
|
||||
await hass.data[DATA_REGISTRY].async_load()
|
||||
|
||||
|
||||
def normalize_name(name: str) -> str:
|
||||
"""Normalize an folder name by removing whitespace and case folding."""
|
||||
return name.casefold().replace(" ", "")
|
|
@ -283,6 +283,7 @@ voluptuous = "vol"
|
|||
"homeassistant.helpers.config_validation" = "cv"
|
||||
"homeassistant.helpers.device_registry" = "dr"
|
||||
"homeassistant.helpers.entity_registry" = "er"
|
||||
"homeassistant.helpers.folder_registry" = "fr"
|
||||
"homeassistant.helpers.issue_registry" = "ir"
|
||||
|
||||
[tool.ruff.flake8-pytest-style]
|
||||
|
|
|
@ -57,6 +57,7 @@ from homeassistant.helpers import (
|
|||
entity,
|
||||
entity_platform,
|
||||
entity_registry as er,
|
||||
folder_registry as fr,
|
||||
intent,
|
||||
issue_registry as ir,
|
||||
recorder as recorder_helper,
|
||||
|
@ -256,6 +257,7 @@ async def async_test_home_assistant(event_loop, load_registries=True):
|
|||
ar.async_load(hass),
|
||||
dr.async_load(hass),
|
||||
er.async_load(hass),
|
||||
fr.async_load(hass),
|
||||
ir.async_load(hass),
|
||||
)
|
||||
hass.data[bootstrap.DATA_REGISTRIES_LOADED] = None
|
||||
|
|
249
tests/components/config/test_folder_registry.py
Normal file
249
tests/components/config/test_folder_registry.py
Normal file
|
@ -0,0 +1,249 @@
|
|||
"""Test folder registry API."""
|
||||
from collections.abc import Awaitable, Callable, Generator
|
||||
from typing import Any
|
||||
|
||||
from aiohttp import ClientWebSocketResponse
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.config import folder_registry as config_folder_registry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import folder_registry as fr
|
||||
|
||||
from tests.common import ANY
|
||||
|
||||
|
||||
@pytest.fixture(name="client")
|
||||
def client_fixture(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: Callable[
|
||||
[HomeAssistant], Awaitable[Generator[ClientWebSocketResponse, Any, Any]]
|
||||
],
|
||||
) -> Generator[ClientWebSocketResponse, None, None]:
|
||||
"""Fixture that can interact with the config manager API."""
|
||||
hass.loop.run_until_complete(config_folder_registry.async_setup(hass))
|
||||
return hass.loop.run_until_complete(hass_ws_client(hass))
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_list_folders(
|
||||
client: ClientWebSocketResponse,
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Test list entries."""
|
||||
folder_registry.async_create("automation", "mock 1")
|
||||
folder_registry.async_create(
|
||||
domain="automation",
|
||||
name="mock 2",
|
||||
icon="mdi:two",
|
||||
)
|
||||
|
||||
assert len(folder_registry.folders) == 2
|
||||
|
||||
await client.send_json(
|
||||
{"id": 1, "type": "config/folder_registry/list", "domain": "automation"}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert len(msg["result"]) == len(folder_registry.folders)
|
||||
assert msg["result"][0] == {
|
||||
"folder_id": ANY,
|
||||
"icon": None,
|
||||
"name": "mock 1",
|
||||
}
|
||||
assert msg["result"][1] == {
|
||||
"folder_id": ANY,
|
||||
"icon": "mdi:two",
|
||||
"name": "mock 2",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_create_folder(
|
||||
client: ClientWebSocketResponse,
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Test create entry."""
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 1,
|
||||
"type": "config/folder_registry/create",
|
||||
"domain": "automation",
|
||||
"name": "Bedroom",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert len(folder_registry.folders) == 1
|
||||
assert msg["result"] == {
|
||||
"folder_id": ANY,
|
||||
"icon": None,
|
||||
"name": "Bedroom",
|
||||
}
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 2,
|
||||
"type": "config/folder_registry/create",
|
||||
"domain": "automation",
|
||||
"name": "Kitchen",
|
||||
"icon": "mdi:kitchen",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert len(folder_registry.folders) == 2
|
||||
assert msg["result"] == {
|
||||
"folder_id": ANY,
|
||||
"icon": "mdi:kitchen",
|
||||
"name": "Kitchen",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_create_folder_with_name_already_in_use(
|
||||
client: ClientWebSocketResponse,
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Test create entry that should fail."""
|
||||
folder_registry.async_create("automation", "Garden")
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 1,
|
||||
"type": "config/folder_registry/create",
|
||||
"domain": "automation",
|
||||
"name": "garden",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert not msg["success"]
|
||||
assert msg["error"]["code"] == "invalid_info"
|
||||
assert msg["error"]["message"] == "The name garden (garden) is already in use"
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_delete_folder(
|
||||
client: ClientWebSocketResponse,
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Test delete entry."""
|
||||
folder = folder_registry.async_create("automation", "Mancave")
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 1,
|
||||
"folder_id": folder.folder_id,
|
||||
"type": "config/folder_registry/delete",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert msg["success"]
|
||||
assert not folder_registry.folders
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_delete_non_existing_folder(
|
||||
client: ClientWebSocketResponse,
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Test delete entry that should fail."""
|
||||
folder_registry.async_create("automation", "Garage")
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
await client.send_json(
|
||||
{"id": 1, "folder_id": "omg_puppies", "type": "config/folder_registry/delete"}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert not msg["success"]
|
||||
assert msg["error"]["code"] == "invalid_info"
|
||||
assert msg["error"]["message"] == "Folder ID doesn't exist"
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_update_folder(
|
||||
client: ClientWebSocketResponse,
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Test update entry."""
|
||||
folder = folder_registry.async_create("automation", "Office")
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 1,
|
||||
"type": "config/folder_registry/update",
|
||||
"folder_id": folder.folder_id,
|
||||
"name": "Baby's Room",
|
||||
"icon": "mdi:baby",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert len(folder_registry.folders) == 1
|
||||
assert msg["result"] == {
|
||||
"folder_id": folder.folder_id,
|
||||
"icon": "mdi:baby",
|
||||
"name": "Baby's Room",
|
||||
}
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 2,
|
||||
"type": "config/folder_registry/update",
|
||||
"folder_id": folder.folder_id,
|
||||
"name": "Todler's Room",
|
||||
"icon": None,
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert len(folder_registry.folders) == 1
|
||||
assert msg["result"] == {
|
||||
"icon": None,
|
||||
"folder_id": folder.folder_id,
|
||||
"name": "Todler's Room",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_update_with_name_already_in_use(
|
||||
client: ClientWebSocketResponse,
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Test update entry."""
|
||||
folder = folder_registry.async_create("automation", "Notifications")
|
||||
folder_registry.async_create("automation", "Living room")
|
||||
assert len(folder_registry.folders) == 2
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 1,
|
||||
"folder_id": folder.folder_id,
|
||||
"name": "Living room",
|
||||
"type": "config/folder_registry/update",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert not msg["success"]
|
||||
assert msg["error"]["code"] == "invalid_info"
|
||||
assert (
|
||||
msg["error"]["message"] == "The name Living room (livingroom) is already in use"
|
||||
)
|
||||
assert len(folder_registry.folders) == 2
|
|
@ -54,6 +54,7 @@ from homeassistant.helpers import (
|
|||
device_registry as dr,
|
||||
entity_registry as er,
|
||||
event,
|
||||
folder_registry as fr,
|
||||
issue_registry as ir,
|
||||
recorder as recorder_helper,
|
||||
)
|
||||
|
@ -1587,6 +1588,12 @@ def entity_registry(hass: HomeAssistant) -> er.EntityRegistry:
|
|||
return er.async_get(hass)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def folder_registry(hass: HomeAssistant) -> fr.FolderRegistry:
|
||||
"""Return the folder registry from the current hass instance."""
|
||||
return fr.async_get(hass)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def issue_registry(hass: HomeAssistant) -> ir.IssueRegistry:
|
||||
"""Return the issue registry from the current hass instance."""
|
||||
|
|
325
tests/helpers/test_folder_registry.py
Normal file
325
tests/helpers/test_folder_registry.py
Normal file
|
@ -0,0 +1,325 @@
|
|||
"""Tests for the folder registry."""
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import folder_registry as fr
|
||||
from homeassistant.helpers.folder_registry import (
|
||||
EVENT_FOLDER_REGISTRY_UPDATED,
|
||||
STORAGE_KEY,
|
||||
STORAGE_VERSION_MAJOR,
|
||||
FolderRegistry,
|
||||
async_get,
|
||||
async_load,
|
||||
)
|
||||
|
||||
from tests.common import async_capture_events, flush_store
|
||||
|
||||
|
||||
async def test_create_folder(
|
||||
hass: HomeAssistant, folder_registry: fr.FolderRegistry
|
||||
) -> None:
|
||||
"""Make sure that we can create folders."""
|
||||
update_events = async_capture_events(hass, EVENT_FOLDER_REGISTRY_UPDATED)
|
||||
folder = folder_registry.async_create(
|
||||
domain="test",
|
||||
name="My Folder",
|
||||
icon="mdi:test",
|
||||
)
|
||||
|
||||
assert folder.folder_id
|
||||
assert folder.name == "My Folder"
|
||||
assert folder.normalized_name == "myfolder"
|
||||
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(update_events) == 1
|
||||
assert update_events[0].data == {
|
||||
"action": "create",
|
||||
"domain": "test",
|
||||
"folder_id": folder.folder_id,
|
||||
}
|
||||
|
||||
|
||||
async def test_create_folder_with_name_already_in_use(
|
||||
hass: HomeAssistant, folder_registry: fr.FolderRegistry
|
||||
) -> None:
|
||||
"""Make sure that we can't create a folder with a name already in use."""
|
||||
update_events = async_capture_events(hass, EVENT_FOLDER_REGISTRY_UPDATED)
|
||||
folder_registry.async_create("test", "mock")
|
||||
|
||||
with pytest.raises(
|
||||
ValueError, match=re.escape("The name mock (mock) is already in use")
|
||||
):
|
||||
folder_registry.async_create("test", "mock")
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(folder_registry.folders) == 1
|
||||
assert len(update_events) == 1
|
||||
|
||||
|
||||
async def test_delete_folder(
|
||||
hass: HomeAssistant, folder_registry: fr.FolderRegistry
|
||||
) -> None:
|
||||
"""Make sure that we can delete a folder."""
|
||||
update_events = async_capture_events(hass, EVENT_FOLDER_REGISTRY_UPDATED)
|
||||
folder = folder_registry.async_create("automation", "My living room automations")
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
folder_registry.async_delete(folder.folder_id)
|
||||
|
||||
assert not folder_registry.folders
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(update_events) == 2
|
||||
assert update_events[0].data == {
|
||||
"action": "create",
|
||||
"domain": "automation",
|
||||
"folder_id": folder.folder_id,
|
||||
}
|
||||
assert update_events[1].data == {
|
||||
"action": "remove",
|
||||
"domain": "automation",
|
||||
"folder_id": folder.folder_id,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_delete_non_existing_folder(folder_registry: fr.FolderRegistry) -> None:
|
||||
"""Make sure that we can't delete a folder that doesn't exist."""
|
||||
folder_registry.async_create("test", "mock")
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
folder_registry.async_delete("")
|
||||
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
|
||||
async def test_update_folder(
|
||||
hass: HomeAssistant, folder_registry: fr.FolderRegistry
|
||||
) -> None:
|
||||
"""Make sure that we can update folders."""
|
||||
update_events = async_capture_events(hass, EVENT_FOLDER_REGISTRY_UPDATED)
|
||||
folder = folder_registry.async_create("script", "My notification script")
|
||||
|
||||
assert len(folder_registry.folders) == 1
|
||||
assert folder.folder_id
|
||||
assert folder.name == "My notification script"
|
||||
assert folder.normalized_name == "mynotificationscript"
|
||||
assert folder.icon is None
|
||||
|
||||
updated_folder = folder_registry.async_update(
|
||||
folder.folder_id,
|
||||
name="My notification thingies",
|
||||
icon="mdi:update",
|
||||
)
|
||||
|
||||
assert updated_folder != folder
|
||||
assert updated_folder.folder_id == folder.folder_id
|
||||
assert updated_folder.name == "My notification thingies"
|
||||
assert updated_folder.normalized_name == "mynotificationthingies"
|
||||
assert updated_folder.icon == "mdi:update"
|
||||
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(update_events) == 2
|
||||
assert update_events[0].data == {
|
||||
"action": "create",
|
||||
"domain": "script",
|
||||
"folder_id": folder.folder_id,
|
||||
}
|
||||
assert update_events[1].data == {
|
||||
"action": "update",
|
||||
"domain": "script",
|
||||
"folder_id": folder.folder_id,
|
||||
}
|
||||
|
||||
|
||||
async def test_update_folder_with_same_data(
|
||||
hass: HomeAssistant, folder_registry: fr.FolderRegistry
|
||||
) -> None:
|
||||
"""Make sure that we can reapply the same data to the folder and it won't update."""
|
||||
update_events = async_capture_events(hass, EVENT_FOLDER_REGISTRY_UPDATED)
|
||||
folder = folder_registry.async_create(
|
||||
"zone",
|
||||
"Shops",
|
||||
icon="mdi:shopping-cart",
|
||||
)
|
||||
|
||||
updated_folder = folder_registry.async_update(
|
||||
folder_id=folder.folder_id,
|
||||
name="Shops",
|
||||
icon="mdi:shopping-cart",
|
||||
)
|
||||
assert folder == updated_folder
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# No update event
|
||||
assert len(update_events) == 1
|
||||
assert update_events[0].data == {
|
||||
"action": "create",
|
||||
"domain": "zone",
|
||||
"folder_id": folder.folder_id,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_update_folder_with_same_name_change_case(
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Make sure that we can reapply the same name with a different case to the folder."""
|
||||
folder = folder_registry.async_create("automation", "frenck")
|
||||
|
||||
updated_folder = folder_registry.async_update(folder.folder_id, name="Frenck")
|
||||
|
||||
assert updated_folder.name == "Frenck"
|
||||
assert updated_folder.folder_id == folder.folder_id
|
||||
assert updated_folder.normalized_name == folder.normalized_name
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_update_folder_with_name_already_in_use(
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Make sure that we can't update an folder with a name already in use."""
|
||||
folder1 = folder_registry.async_create("automation", "Kitchen")
|
||||
folder2 = folder_registry.async_create("automation", "Another kitchen")
|
||||
|
||||
with pytest.raises(
|
||||
ValueError, match=re.escape("The name kitchen (kitchen) is already in use")
|
||||
):
|
||||
folder_registry.async_update(folder2.folder_id, name="kitchen")
|
||||
|
||||
assert folder1.name == "Kitchen"
|
||||
assert folder2.name == "Another kitchen"
|
||||
assert len(folder_registry.folders) == 2
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_update_folder_with_normalized_name_already_in_use(
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Make sure that we can't update a folder with a normalized name already in use."""
|
||||
folder1 = folder_registry.async_create("scripts", "mock1")
|
||||
folder2 = folder_registry.async_create("scripts", "M O C K 2")
|
||||
|
||||
with pytest.raises(
|
||||
ValueError, match=re.escape("The name mock2 (mock2) is already in use")
|
||||
):
|
||||
folder_registry.async_update(folder1.folder_id, name="mock2")
|
||||
|
||||
assert folder1.name == "mock1"
|
||||
assert folder2.name == "M O C K 2"
|
||||
assert len(folder_registry.folders) == 2
|
||||
|
||||
|
||||
async def test_load_folders(
|
||||
hass: HomeAssistant, folder_registry: fr.FolderRegistry
|
||||
) -> None:
|
||||
"""Make sure that we can load/save data correctly."""
|
||||
folder1 = folder_registry.async_create(
|
||||
"automation",
|
||||
"One",
|
||||
icon="mdi:one",
|
||||
)
|
||||
folder2 = folder_registry.async_create(
|
||||
"script",
|
||||
"Two",
|
||||
icon="mdi:two",
|
||||
)
|
||||
|
||||
assert len(folder_registry.folders) == 2
|
||||
|
||||
registry2 = FolderRegistry(hass)
|
||||
await flush_store(folder_registry._store)
|
||||
await registry2.async_load()
|
||||
|
||||
assert len(registry2.folders) == 2
|
||||
assert list(folder_registry.folders) == list(registry2.folders)
|
||||
|
||||
folder1_registry2 = registry2.async_get_or_create("automation", "One")
|
||||
assert folder1_registry2.folder_id == folder1.folder_id
|
||||
assert folder1_registry2.domain == folder1.domain
|
||||
assert folder1_registry2.name == folder1.name
|
||||
assert folder1_registry2.icon == folder1.icon
|
||||
assert folder1_registry2.normalized_name == folder1.normalized_name
|
||||
|
||||
folder2_registry2 = registry2.async_get_or_create("script", "Two")
|
||||
assert folder2_registry2.folder_id == folder2.folder_id
|
||||
assert folder2_registry2.domain == folder2.domain
|
||||
assert folder2_registry2.name == folder2.name
|
||||
assert folder2_registry2.icon == folder2.icon
|
||||
assert folder2_registry2.normalized_name == folder2.normalized_name
|
||||
|
||||
|
||||
@pytest.mark.parametrize("load_registries", [False])
|
||||
async def test_loading_folders_from_storage(
|
||||
hass: HomeAssistant, hass_storage: Any
|
||||
) -> None:
|
||||
"""Test loading stored folders on start."""
|
||||
hass_storage[STORAGE_KEY] = {
|
||||
"version": STORAGE_VERSION_MAJOR,
|
||||
"data": {
|
||||
"folders": [
|
||||
{
|
||||
"domain": "automation",
|
||||
"icon": "mdi:one",
|
||||
"folder_id": "uuid1",
|
||||
"name": "One",
|
||||
},
|
||||
{
|
||||
"domain": "script",
|
||||
"icon": None,
|
||||
"folder_id": "uuid2",
|
||||
"name": "Two",
|
||||
},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
await async_load(hass)
|
||||
registry = async_get(hass)
|
||||
|
||||
assert len(registry.folders) == 2
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_getting_folders(folder_registry: fr.FolderRegistry) -> None:
|
||||
"""Make sure we can get the folderrs by name."""
|
||||
folder1 = folder_registry.async_get_or_create("automation", "Living room")
|
||||
folder2 = folder_registry.async_get_or_create("automation", "living room")
|
||||
folder3 = folder_registry.async_get_or_create("automation", "living room")
|
||||
|
||||
assert folder1 == folder2
|
||||
assert folder1 == folder3
|
||||
assert folder2 == folder3
|
||||
|
||||
get_folder = folder_registry.async_get_folder_by_name(
|
||||
"automation", "l i v i n g r o o m"
|
||||
)
|
||||
assert get_folder == folder1
|
||||
|
||||
get_folder = folder_registry.async_get_folder(folder1.folder_id)
|
||||
assert get_folder == folder1
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("hass")
|
||||
async def test_async_get_folder_by_name_not_found(
|
||||
folder_registry: fr.FolderRegistry,
|
||||
) -> None:
|
||||
"""Make sure we return None for non-existent folders."""
|
||||
folder_registry.async_create("automation", "Bathroom")
|
||||
|
||||
assert len(folder_registry.folders) == 1
|
||||
|
||||
assert folder_registry.async_get_folder_by_name("automation", "mancave") is None
|
|
@ -28,6 +28,7 @@ from homeassistant.helpers import (
|
|||
area_registry as ar,
|
||||
device_registry as dr,
|
||||
entity_registry as er,
|
||||
folder_registry as fr,
|
||||
issue_registry as ir,
|
||||
)
|
||||
|
||||
|
@ -64,6 +65,10 @@ class FlowResultSnapshot(dict):
|
|||
"""Tiny wrapper to represent a flow result in snapshots."""
|
||||
|
||||
|
||||
class FolderRegistryEntrySnapshot(dict):
|
||||
"""Tiny wrapper to represent an folder registry entry in snapshots."""
|
||||
|
||||
|
||||
class IssueRegistryItemSnapshot(dict):
|
||||
"""Tiny wrapper to represent an entity registry entry in snapshots."""
|
||||
|
||||
|
@ -101,6 +106,8 @@ class HomeAssistantSnapshotSerializer(AmberDataSerializer):
|
|||
serializable_data = cls._serializable_device_registry_entry(data)
|
||||
elif isinstance(data, er.RegistryEntry):
|
||||
serializable_data = cls._serializable_entity_registry_entry(data)
|
||||
elif isinstance(data, fr.FolderEntry):
|
||||
serializable_data = cls._serializable_issue_registry_entry(data)
|
||||
elif isinstance(data, ir.IssueEntry):
|
||||
serializable_data = cls._serializable_issue_registry_entry(data)
|
||||
elif isinstance(data, dict) and "flow_id" in data and "handler" in data:
|
||||
|
@ -182,6 +189,13 @@ class HomeAssistantSnapshotSerializer(AmberDataSerializer):
|
|||
"""Prepare a Home Assistant flow result for serialization."""
|
||||
return FlowResultSnapshot(data | {"flow_id": ANY})
|
||||
|
||||
@classmethod
|
||||
def _serializable_folder_registry_entry(
|
||||
cls, data: fr.FolderEntry
|
||||
) -> SerializableData:
|
||||
"""Prepare a Home Assistant folder registry entry for serialization."""
|
||||
return FolderRegistryEntrySnapshot(dataclasses.asdict(data))
|
||||
|
||||
@classmethod
|
||||
def _serializable_issue_registry_entry(
|
||||
cls, data: ir.IssueEntry
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue