Add floor registry (#110741)

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Franck Nijhof 2024-02-17 21:21:15 +01:00 committed by GitHub
parent 1ded412061
commit 4570eed6f6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 1104 additions and 4 deletions

View file

@ -32,6 +32,7 @@ from .helpers import (
device_registry,
entity,
entity_registry,
floor_registry,
issue_registry,
recorder,
restore_state,
@ -301,6 +302,7 @@ async def async_load_base_functionality(hass: core.HomeAssistant) -> None:
area_registry.async_load(hass),
device_registry.async_load(hass),
entity_registry.async_load(hass),
floor_registry.async_load(hass),
issue_registry.async_load(hass),
hass.async_add_executor_job(_cache_uname_processor),
template.async_load_custom_templates(hass),

View file

@ -36,6 +36,7 @@ SECTIONS = (
"core",
"device_registry",
"entity_registry",
"floor_registry",
"script",
"scene",
)

View file

@ -0,0 +1,126 @@
"""Websocket API to interact with the floor registry."""
from typing import Any
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.components.websocket_api.connection import ActiveConnection
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.floor_registry import FloorEntry, async_get
@callback
def async_setup(hass: HomeAssistant) -> bool:
"""Register the floor registry WS commands."""
websocket_api.async_register_command(hass, websocket_list_floors)
websocket_api.async_register_command(hass, websocket_create_floor)
websocket_api.async_register_command(hass, websocket_delete_floor)
websocket_api.async_register_command(hass, websocket_update_floor)
return True
@websocket_api.websocket_command(
{
vol.Required("type"): "config/floor_registry/list",
}
)
@callback
def websocket_list_floors(
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
) -> None:
"""Handle list floors command."""
registry = async_get(hass)
connection.send_result(
msg["id"],
[_entry_dict(entry) for entry in registry.async_list_floors()],
)
@websocket_api.websocket_command(
{
vol.Required("type"): "config/floor_registry/create",
vol.Required("name"): str,
vol.Optional("icon"): vol.Any(str, None),
vol.Optional("level"): int,
}
)
@websocket_api.require_admin
@callback
def websocket_create_floor(
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
) -> None:
"""Create floor command."""
registry = async_get(hass)
data = dict(msg)
data.pop("type")
data.pop("id")
try:
entry = registry.async_create(**data)
except ValueError as err:
connection.send_error(msg["id"], "invalid_info", str(err))
else:
connection.send_result(msg["id"], _entry_dict(entry))
@websocket_api.websocket_command(
{
vol.Required("type"): "config/floor_registry/delete",
vol.Required("floor_id"): str,
}
)
@websocket_api.require_admin
@callback
def websocket_delete_floor(
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
) -> None:
"""Delete floor command."""
registry = async_get(hass)
try:
registry.async_delete(msg["floor_id"])
except KeyError:
connection.send_error(msg["id"], "invalid_info", "Floor ID doesn't exist")
else:
connection.send_result(msg["id"])
@websocket_api.websocket_command(
{
vol.Required("type"): "config/floor_registry/update",
vol.Required("floor_id"): str,
vol.Optional("icon"): vol.Any(str, None),
vol.Optional("level"): int,
vol.Optional("name"): str,
}
)
@websocket_api.require_admin
@callback
def websocket_update_floor(
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
) -> None:
"""Handle update floor websocket command."""
registry = async_get(hass)
data = dict(msg)
data.pop("type")
data.pop("id")
try:
entry = registry.async_update(**data)
except ValueError as err:
connection.send_error(msg["id"], "invalid_info", str(err))
else:
connection.send_result(msg["id"], _entry_dict(entry))
@callback
def _entry_dict(entry: FloorEntry) -> dict[str, Any]:
"""Convert entry to API format."""
return {
"floor_id": entry.floor_id,
"icon": entry.icon,
"level": entry.level,
"name": entry.name,
}

View file

@ -6,7 +6,7 @@ from collections.abc import Iterable, ValuesView
import dataclasses
from typing import Any, Literal, TypedDict, cast
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.util import slugify
from . import device_registry as dr, entity_registry as er
@ -17,7 +17,7 @@ DATA_REGISTRY = "area_registry"
EVENT_AREA_REGISTRY_UPDATED = "area_registry_updated"
STORAGE_KEY = "core.area_registry"
STORAGE_VERSION_MAJOR = 1
STORAGE_VERSION_MINOR = 4
STORAGE_VERSION_MINOR = 5
SAVE_DELAY = 10
@ -33,6 +33,7 @@ class AreaEntry:
"""Area Registry Entry."""
aliases: set[str]
floor_id: str | None
icon: str | None
id: str
name: str
@ -113,6 +114,11 @@ class AreaRegistryStore(Store[dict[str, list[dict[str, Any]]]]):
for area in old_data["areas"]:
area["icon"] = None
if old_minor_version < 5:
# Version 1.5 adds floor_id
for area in old_data["areas"]:
area["floor_id"] = None
if old_major_version > 1:
raise NotImplementedError
return old_data
@ -167,6 +173,7 @@ class AreaRegistry:
name: str,
*,
aliases: set[str] | None = None,
floor_id: str | None = None,
icon: str | None = None,
picture: str | None = None,
) -> AreaEntry:
@ -179,6 +186,7 @@ class AreaRegistry:
area_id = self._generate_area_id(name)
area = AreaEntry(
aliases=aliases or set(),
floor_id=floor_id,
icon=icon,
id=area_id,
name=name,
@ -215,6 +223,7 @@ class AreaRegistry:
area_id: str,
*,
aliases: set[str] | UndefinedType = UNDEFINED,
floor_id: str | None | UndefinedType = UNDEFINED,
icon: str | None | UndefinedType = UNDEFINED,
name: str | UndefinedType = UNDEFINED,
picture: str | None | UndefinedType = UNDEFINED,
@ -223,6 +232,7 @@ class AreaRegistry:
updated = self._async_update(
area_id,
aliases=aliases,
floor_id=floor_id,
icon=icon,
name=name,
picture=picture,
@ -238,6 +248,7 @@ class AreaRegistry:
area_id: str,
*,
aliases: set[str] | UndefinedType = UNDEFINED,
floor_id: str | None | UndefinedType = UNDEFINED,
icon: str | None | UndefinedType = UNDEFINED,
name: str | UndefinedType = UNDEFINED,
picture: str | None | UndefinedType = UNDEFINED,
@ -251,6 +262,7 @@ class AreaRegistry:
("aliases", aliases),
("icon", icon),
("picture", picture),
("floor_id", floor_id),
):
if value is not UNDEFINED and value != getattr(old, attr_name):
new_values[attr_name] = value
@ -269,6 +281,8 @@ class AreaRegistry:
async def async_load(self) -> None:
"""Load the area registry."""
self._async_setup_cleanup()
data = await self._store.async_load()
areas = AreaRegistryItems()
@ -279,6 +293,7 @@ class AreaRegistry:
normalized_name = normalize_area_name(area["name"])
areas[area["id"]] = AreaEntry(
aliases=set(area["aliases"]),
floor_id=area["floor_id"],
icon=area["icon"],
id=area["id"],
name=area["name"],
@ -302,6 +317,7 @@ class AreaRegistry:
data["areas"] = [
{
"aliases": list(entry.aliases),
"floor_id": entry.floor_id,
"icon": entry.icon,
"id": entry.id,
"name": entry.name,
@ -321,6 +337,31 @@ class AreaRegistry:
suggestion = f"{suggestion_base}_{tries}"
return suggestion
@callback
def _async_setup_cleanup(self) -> None:
"""Set up the area registry cleanup."""
# pylint: disable-next=import-outside-toplevel
from . import floor_registry as fr # Circular dependency
@callback
def _floor_removed_from_registry_filter(event: Event) -> bool:
"""Filter all except for the remove action from floor registry events."""
return bool(event.data["action"] == "remove")
@callback
def _handle_floor_registry_update(event: Event) -> None:
"""Update areas that are associated with a floor that has been removed."""
floor_id = event.data["floor_id"]
for area_id, area in self.areas.items():
if floor_id == area.floor_id:
self.async_update(area_id, floor_id=None)
self.hass.bus.async_listen(
event_type=fr.EVENT_FLOOR_REGISTRY_UPDATED,
event_filter=_floor_removed_from_registry_filter,
listener=_handle_floor_registry_update,
)
@callback
def async_get(hass: HomeAssistant) -> AreaRegistry:
@ -335,6 +376,12 @@ async def async_load(hass: HomeAssistant) -> None:
await hass.data[DATA_REGISTRY].async_load()
@callback
def async_entries_for_floor(registry: AreaRegistry, floor_id: str) -> list[AreaEntry]:
"""Return entries that match an floor."""
return [area for area in registry.areas.values() if floor_id == area.floor_id]
def normalize_area_name(area_name: str) -> str:
"""Normalize an area name by removing whitespace and case folding."""
return area_name.casefold().replace(" ", "")

View file

@ -0,0 +1,261 @@
"""Provide a way to assign areas to floors in one's home."""
from __future__ import annotations
from collections import UserDict
from collections.abc import Iterable, ValuesView
import dataclasses
from dataclasses import dataclass
from typing import cast
from homeassistant.core import HomeAssistant, callback
from homeassistant.util import slugify
from .typing import UNDEFINED, UndefinedType
DATA_REGISTRY = "floor_registry"
EVENT_FLOOR_REGISTRY_UPDATED = "floor_registry_updated"
STORAGE_KEY = "core.floor_registry"
STORAGE_VERSION_MAJOR = 1
SAVE_DELAY = 10
@dataclass(slots=True, kw_only=True, frozen=True)
class FloorEntry:
"""Floor registry entry."""
aliases: set[str]
floor_id: str
icon: str | None = None
level: int = 0
name: str
normalized_name: str
class FloorRegistryItems(UserDict[str, FloorEntry]):
"""Container for floor registry items, maps floor id -> entry.
Maintains an additional index:
- normalized name -> entry
"""
def __init__(self) -> None:
"""Initialize the container."""
super().__init__()
self._normalized_names: dict[str, FloorEntry] = {}
def values(self) -> ValuesView[FloorEntry]:
"""Return the underlying values to avoid __iter__ overhead."""
return self.data.values()
def __setitem__(self, key: str, entry: FloorEntry) -> None:
"""Add an item."""
data = self.data
normalized_name = _normalize_floor_name(entry.name)
if key in data:
old_entry = data[key]
if (
normalized_name != old_entry.normalized_name
and normalized_name in self._normalized_names
):
raise ValueError(
f"The name {entry.name} ({normalized_name}) is already in use"
)
del self._normalized_names[old_entry.normalized_name]
data[key] = entry
self._normalized_names[normalized_name] = entry
def __delitem__(self, key: str) -> None:
"""Remove an item."""
entry = self[key]
normalized_name = _normalize_floor_name(entry.name)
del self._normalized_names[normalized_name]
super().__delitem__(key)
def get_floor_by_name(self, name: str) -> FloorEntry | None:
"""Get floor by name."""
return self._normalized_names.get(_normalize_floor_name(name))
class FloorRegistry:
"""Class to hold a registry of floors."""
floors: FloorRegistryItems
_floor_data: dict[str, FloorEntry]
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the floor registry."""
self.hass = hass
self._store = hass.helpers.storage.Store(
STORAGE_VERSION_MAJOR,
STORAGE_KEY,
atomic_writes=True,
)
@callback
def async_get_floor(self, floor_id: str) -> FloorEntry | None:
"""Get floor by id.
We retrieve the FloorEntry from the underlying dict to avoid
the overhead of the UserDict __getitem__.
"""
return self._floor_data.get(floor_id)
@callback
def async_get_floor_by_name(self, name: str) -> FloorEntry | None:
"""Get floor by name."""
return self.floors.get_floor_by_name(name)
@callback
def async_list_floors(self) -> Iterable[FloorEntry]:
"""Get all floors."""
return self.floors.values()
@callback
def _generate_id(self, name: str) -> str:
"""Generate floor ID."""
suggestion = suggestion_base = slugify(name)
tries = 1
while suggestion in self.floors:
tries += 1
suggestion = f"{suggestion_base}_{tries}"
return suggestion
@callback
def async_create(
self,
name: str,
*,
aliases: set[str] | None = None,
icon: str | None = None,
level: int = 0,
) -> FloorEntry:
"""Create a new floor."""
if floor := self.async_get_floor_by_name(name):
raise ValueError(
f"The name {name} ({floor.normalized_name}) is already in use"
)
normalized_name = _normalize_floor_name(name)
floor = FloorEntry(
aliases=aliases or set(),
icon=icon,
floor_id=self._generate_id(name),
name=name,
normalized_name=normalized_name,
level=level,
)
floor_id = floor.floor_id
self.floors[floor_id] = floor
self.async_schedule_save()
self.hass.bus.async_fire(
EVENT_FLOOR_REGISTRY_UPDATED,
{"action": "create", "floor_id": floor_id},
)
return floor
@callback
def async_delete(self, floor_id: str) -> None:
"""Delete floor."""
del self.floors[floor_id]
self.hass.bus.async_fire(
EVENT_FLOOR_REGISTRY_UPDATED, {"action": "remove", "floor_id": floor_id}
)
self.async_schedule_save()
@callback
def async_update(
self,
floor_id: str,
*,
aliases: set[str] | UndefinedType = UNDEFINED,
icon: str | None | UndefinedType = UNDEFINED,
level: int | UndefinedType = UNDEFINED,
name: str | UndefinedType = UNDEFINED,
) -> FloorEntry:
"""Update name of the floor."""
old = self.floors[floor_id]
changes = {
attr_name: value
for attr_name, value in (
("aliases", aliases),
("icon", icon),
("level", level),
)
if value is not UNDEFINED and value != getattr(old, attr_name)
}
if name is not UNDEFINED and name != old.name:
changes["name"] = name
changes["normalized_name"] = _normalize_floor_name(name)
if not changes:
return old
new = self.floors[floor_id] = dataclasses.replace(old, **changes) # type: ignore[arg-type]
self.async_schedule_save()
self.hass.bus.async_fire(
EVENT_FLOOR_REGISTRY_UPDATED, {"action": "update", "floor_id": floor_id}
)
return new
async def async_load(self) -> None:
"""Load the floor registry."""
data = await self._store.async_load()
floors = FloorRegistryItems()
if data is not None:
for floor in data["floors"]:
normalized_name = _normalize_floor_name(floor["name"])
floors[floor["floor_id"]] = FloorEntry(
aliases=set(floor["aliases"]),
icon=floor["icon"],
floor_id=floor["floor_id"],
name=floor["name"],
level=floor["level"],
normalized_name=normalized_name,
)
self.floors = floors
self._floor_data = floors.data
@callback
def async_schedule_save(self) -> None:
"""Schedule saving the floor registry."""
self._store.async_delay_save(self._data_to_save, SAVE_DELAY)
@callback
def _data_to_save(self) -> dict[str, list[dict[str, str | int | list[str] | None]]]:
"""Return data of floor registry to store in a file."""
return {
"floors": [
{
"aliases": list(entry.aliases),
"floor_id": entry.floor_id,
"icon": entry.icon,
"level": entry.level,
"name": entry.name,
}
for entry in self.floors.values()
]
}
@callback
def async_get(hass: HomeAssistant) -> FloorRegistry:
"""Get floor registry."""
return cast(FloorRegistry, hass.data[DATA_REGISTRY])
async def async_load(hass: HomeAssistant) -> None:
"""Load floor registry."""
assert DATA_REGISTRY not in hass.data
hass.data[DATA_REGISTRY] = FloorRegistry(hass)
await hass.data[DATA_REGISTRY].async_load()
def _normalize_floor_name(floor_name: str) -> str:
"""Normalize a floor name by removing whitespace and case folding."""
return floor_name.casefold().replace(" ", "")

View file

@ -691,6 +691,7 @@ voluptuous = "vol"
"homeassistant.helpers.config_validation" = "cv"
"homeassistant.helpers.device_registry" = "dr"
"homeassistant.helpers.entity_registry" = "er"
"homeassistant.helpers.floor_registry" = "fr"
"homeassistant.helpers.issue_registry" = "ir"
"homeassistant.util.dt" = "dt_util"

View file

@ -63,6 +63,7 @@ from homeassistant.helpers import (
entity_platform,
entity_registry as er,
event,
floor_registry as fr,
intent,
issue_registry as ir,
recorder as recorder_helper,
@ -294,6 +295,7 @@ async def async_test_home_assistant(
ar.async_load(hass),
dr.async_load(hass),
er.async_load(hass),
fr.async_load(hass),
ir.async_load(hass),
rs.async_load(hass),
)

View file

@ -0,0 +1,229 @@
"""Test floor registry API."""
import pytest
from homeassistant.components.config import floor_registry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import floor_registry as fr
from tests.typing import MockHAClientWebSocket, WebSocketGenerator
@pytest.fixture(name="client")
async def client_fixture(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> MockHAClientWebSocket:
"""Fixture that can interact with the config manager API."""
floor_registry.async_setup(hass)
return await hass_ws_client(hass)
async def test_list_floors(
client: MockHAClientWebSocket,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test list entries."""
floor_registry.async_create("First floor")
floor_registry.async_create(
name="Second floor",
icon="mdi:home-floor-2",
level=2,
)
assert len(floor_registry.floors) == 2
await client.send_json_auto_id({"type": "config/floor_registry/list"})
msg = await client.receive_json()
assert len(msg["result"]) == len(floor_registry.floors)
assert msg["result"][0] == {
"icon": None,
"floor_id": "first_floor",
"name": "First floor",
"level": 0,
}
assert msg["result"][1] == {
"icon": "mdi:home-floor-2",
"floor_id": "second_floor",
"name": "Second floor",
"level": 2,
}
async def test_create_floor(
client: MockHAClientWebSocket,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test create entry."""
await client.send_json_auto_id(
{"type": "config/floor_registry/create", "name": "First floor"}
)
msg = await client.receive_json()
assert len(floor_registry.floors) == 1
assert msg["result"] == {
"icon": None,
"floor_id": "first_floor",
"name": "First floor",
"level": 0,
}
await client.send_json_auto_id(
{
"name": "Second floor",
"type": "config/floor_registry/create",
"icon": "mdi:home-floor-2",
"level": 2,
}
)
msg = await client.receive_json()
assert len(floor_registry.floors) == 2
assert msg["result"] == {
"icon": "mdi:home-floor-2",
"floor_id": "second_floor",
"name": "Second floor",
"level": 2,
}
async def test_create_floor_with_name_already_in_use(
client: MockHAClientWebSocket,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test create entry that should fail."""
floor_registry.async_create("First floor")
assert len(floor_registry.floors) == 1
await client.send_json_auto_id(
{"name": "First floor", "type": "config/floor_registry/create"}
)
msg = await client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == "invalid_info"
assert (
msg["error"]["message"] == "The name First floor (firstfloor) is already in use"
)
assert len(floor_registry.floors) == 1
async def test_delete_floor(
client: MockHAClientWebSocket,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test delete entry."""
floor = floor_registry.async_create("First floor")
assert len(floor_registry.floors) == 1
await client.send_json_auto_id(
{"floor_id": floor.floor_id, "type": "config/floor_registry/delete"}
)
msg = await client.receive_json()
assert msg["success"]
assert not floor_registry.floors
async def test_delete_non_existing_floor(
client: MockHAClientWebSocket,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test delete entry that should fail."""
floor_registry.async_create("First floor")
assert len(floor_registry.floors) == 1
await client.send_json_auto_id(
{
"floor_id": "zaphotbeeblebrox",
"type": "config/floor_registry/delete",
}
)
msg = await client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == "invalid_info"
assert msg["error"]["message"] == "Floor ID doesn't exist"
assert len(floor_registry.floors) == 1
async def test_update_floor(
client: MockHAClientWebSocket,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test update entry."""
floor = floor_registry.async_create("First floor")
assert len(floor_registry.floors) == 1
await client.send_json_auto_id(
{
"floor_id": floor.floor_id,
"name": "Second floor",
"icon": "mdi:home-floor-2",
"type": "config/floor_registry/update",
"level": 2,
}
)
msg = await client.receive_json()
assert len(floor_registry.floors) == 1
assert msg["result"] == {
"icon": "mdi:home-floor-2",
"floor_id": floor.floor_id,
"name": "Second floor",
"level": 2,
}
await client.send_json_auto_id(
{
"floor_id": floor.floor_id,
"name": "First floor",
"icon": None,
"level": 1,
"type": "config/floor_registry/update",
}
)
msg = await client.receive_json()
assert len(floor_registry.floors) == 1
assert msg["result"] == {
"icon": None,
"floor_id": floor.floor_id,
"name": "First floor",
"level": 1,
}
async def test_update_with_name_already_in_use(
client: MockHAClientWebSocket,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test update entry."""
floor = floor_registry.async_create("First floor")
floor_registry.async_create("Second floor")
assert len(floor_registry.floors) == 2
await client.send_json_auto_id(
{
"floor_id": floor.floor_id,
"name": "Second floor",
"type": "config/floor_registry/update",
}
)
msg = await client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == "invalid_info"
assert (
msg["error"]["message"]
== "The name Second floor (secondfloor) is already in use"
)
assert len(floor_registry.floors) == 2

View file

@ -55,6 +55,7 @@ from homeassistant.helpers import (
config_entry_oauth2_flow,
device_registry as dr,
entity_registry as er,
floor_registry as fr,
issue_registry as ir,
recorder as recorder_helper,
)
@ -1601,6 +1602,12 @@ def entity_registry(hass: HomeAssistant) -> er.EntityRegistry:
return er.async_get(hass)
@pytest.fixture
def floor_registry(hass: HomeAssistant) -> fr.FloorRegistry:
"""Return the floor registry from the current hass instance."""
return fr.async_get(hass)
@pytest.fixture
def issue_registry(hass: HomeAssistant) -> ir.IssueRegistry:
"""Return the issue registry from the current hass instance."""

View file

@ -4,7 +4,7 @@ from typing import Any
import pytest
from homeassistant.core import HomeAssistant
from homeassistant.helpers import area_registry as ar
from homeassistant.helpers import area_registry as ar, floor_registry as fr
from tests.common import ANY, async_capture_events, flush_store
@ -27,6 +27,7 @@ async def test_create_area(hass: HomeAssistant, area_registry: ar.AreaRegistry)
assert area == ar.AreaEntry(
aliases=set(),
floor_id=None,
icon=None,
id=ANY,
name="mock",
@ -50,6 +51,7 @@ async def test_create_area(hass: HomeAssistant, area_registry: ar.AreaRegistry)
assert area == ar.AreaEntry(
aliases={"alias_1", "alias_2"},
floor_id=None,
icon=None,
id=ANY,
name="mock 2",
@ -133,14 +135,20 @@ async def test_delete_non_existing_area(area_registry: ar.AreaRegistry) -> None:
assert len(area_registry.areas) == 1
async def test_update_area(hass: HomeAssistant, area_registry: ar.AreaRegistry) -> None:
async def test_update_area(
hass: HomeAssistant,
area_registry: ar.AreaRegistry,
floor_registry: fr.FloorRegistry,
) -> None:
"""Make sure that we can read areas."""
update_events = async_capture_events(hass, ar.EVENT_AREA_REGISTRY_UPDATED)
floor_registry.async_create("first")
area = area_registry.async_create("mock")
updated_area = area_registry.async_update(
area.id,
aliases={"alias_1", "alias_2"},
floor_id="first",
icon="mdi:garage",
name="mock1",
picture="/image/example.png",
@ -149,6 +157,7 @@ async def test_update_area(hass: HomeAssistant, area_registry: ar.AreaRegistry)
assert updated_area != area
assert updated_area == ar.AreaEntry(
aliases={"alias_1", "alias_2"},
floor_id="first",
icon="mdi:garage",
id=ANY,
name="mock1",
@ -257,6 +266,7 @@ async def test_loading_area_from_storage(
"areas": [
{
"aliases": ["alias_1", "alias_2"],
"floor_id": "first_floor",
"id": "12345A",
"icon": "mdi:garage",
"name": "mock",
@ -299,6 +309,7 @@ async def test_migration_from_1_1(
"areas": [
{
"aliases": [],
"floor_id": None,
"icon": None,
"id": "12345A",
"name": "mock",
@ -345,3 +356,58 @@ async def test_async_get_area(area_registry: ar.AreaRegistry) -> None:
assert len(area_registry.areas) == 1
assert area_registry.async_get_area(area.id).normalized_name == "mock1"
async def test_removing_floors(
hass: HomeAssistant,
area_registry: ar.AreaRegistry,
floor_registry: fr.FloorRegistry,
) -> None:
"""Make sure we can clear floors."""
first_floor = floor_registry.async_create("First floor")
second_floor = floor_registry.async_create("Second floor")
kitchen = area_registry.async_create("Kitchen")
kitchen = area_registry.async_update(kitchen.id, floor_id=first_floor.floor_id)
bedroom = area_registry.async_create("Bedroom")
bedroom = area_registry.async_update(bedroom.id, floor_id=second_floor.floor_id)
floor_registry.async_delete(first_floor.floor_id)
await hass.async_block_till_done()
assert area_registry.async_get_area(kitchen.id).floor_id is None
assert area_registry.async_get_area(bedroom.id).floor_id == second_floor.floor_id
floor_registry.async_delete(second_floor.floor_id)
await hass.async_block_till_done()
assert area_registry.async_get_area(kitchen.id).floor_id is None
assert area_registry.async_get_area(bedroom.id).floor_id is None
@pytest.mark.usefixtures("hass")
async def test_entries_for_floor(
area_registry: ar.AreaRegistry,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test getting area entries by floor."""
first_floor = floor_registry.async_create("First floor")
second_floor = floor_registry.async_create("Second floor")
kitchen = area_registry.async_create("Kitchen")
kitchen = area_registry.async_update(kitchen.id, floor_id=first_floor.floor_id)
living_room = area_registry.async_create("Living room")
living_room = area_registry.async_update(
living_room.id, floor_id=first_floor.floor_id
)
bedroom = area_registry.async_create("Bedroom")
bedroom = area_registry.async_update(bedroom.id, floor_id=second_floor.floor_id)
entries = ar.async_entries_for_floor(area_registry, first_floor.floor_id)
assert len(entries) == 2
assert entries == [kitchen, living_room]
entries = ar.async_entries_for_floor(area_registry, second_floor.floor_id)
assert len(entries) == 1
assert entries == [bedroom]
assert not ar.async_entries_for_floor(area_registry, "unknown")
assert not ar.async_entries_for_floor(area_registry, "")

View file

@ -0,0 +1,358 @@
"""Tests for the floor registry."""
import re
from typing import Any
import pytest
from homeassistant.core import HomeAssistant
from homeassistant.helpers import area_registry as ar, floor_registry as fr
from homeassistant.helpers.floor_registry import (
EVENT_FLOOR_REGISTRY_UPDATED,
STORAGE_KEY,
STORAGE_VERSION_MAJOR,
FloorRegistry,
async_get,
async_load,
)
from tests.common import async_capture_events, flush_store
async def test_list_floors(floor_registry: fr.FloorRegistry) -> None:
"""Make sure that we can read floors."""
floors = floor_registry.async_list_floors()
assert len(list(floors)) == len(floor_registry.floors)
async def test_create_floor(
hass: HomeAssistant, floor_registry: fr.FloorRegistry
) -> None:
"""Make sure that we can create floors."""
update_events = async_capture_events(hass, EVENT_FLOOR_REGISTRY_UPDATED)
floor = floor_registry.async_create(
name="First floor",
icon="mdi:home-floor-1",
aliases={"first", "ground", "ground floor"},
level=1,
)
assert floor.floor_id == "first_floor"
assert floor.name == "First floor"
assert floor.icon == "mdi:home-floor-1"
assert floor.aliases == {"first", "ground", "ground floor"}
assert floor.level == 1
assert len(floor_registry.floors) == 1
await hass.async_block_till_done()
assert len(update_events) == 1
assert update_events[0].data == {
"action": "create",
"floor_id": floor.floor_id,
}
async def test_create_floor_with_name_already_in_use(
hass: HomeAssistant, floor_registry: fr.FloorRegistry
) -> None:
"""Make sure that we can't create a floor with a name already in use."""
update_events = async_capture_events(hass, EVENT_FLOOR_REGISTRY_UPDATED)
floor_registry.async_create("First floor")
with pytest.raises(
ValueError,
match=re.escape("The name First floor (firstfloor) is already in use"),
):
floor_registry.async_create("First floor")
await hass.async_block_till_done()
assert len(floor_registry.floors) == 1
assert len(update_events) == 1
async def test_create_floor_with_id_already_in_use(
hass: HomeAssistant, floor_registry: fr.FloorRegistry
) -> None:
"""Make sure that we can't create an floor with an id already in use."""
floor = floor_registry.async_create("First")
updated_floor = floor_registry.async_update(floor.floor_id, name="Second")
assert updated_floor.floor_id == floor.floor_id
another_floor = floor_registry.async_create("First")
assert floor.floor_id != another_floor.floor_id
assert another_floor.floor_id == "first_2"
async def test_delete_floor(
hass: HomeAssistant, floor_registry: fr.FloorRegistry
) -> None:
"""Make sure that we can delete a floor."""
update_events = async_capture_events(hass, EVENT_FLOOR_REGISTRY_UPDATED)
floor = floor_registry.async_create("First floor")
assert len(floor_registry.floors) == 1
floor_registry.async_delete(floor.floor_id)
assert not floor_registry.floors
await hass.async_block_till_done()
assert len(update_events) == 2
assert update_events[0].data == {
"action": "create",
"floor_id": floor.floor_id,
}
assert update_events[1].data == {
"action": "remove",
"floor_id": floor.floor_id,
}
async def test_delete_non_existing_floor(floor_registry: fr.FloorRegistry) -> None:
"""Make sure that we can't delete a floor that doesn't exist."""
floor_registry.async_create("First floor")
with pytest.raises(KeyError):
floor_registry.async_delete("")
assert len(floor_registry.floors) == 1
async def test_update_floor(
hass: HomeAssistant, floor_registry: fr.FloorRegistry
) -> None:
"""Make sure that we can update floors."""
update_events = async_capture_events(hass, EVENT_FLOOR_REGISTRY_UPDATED)
floor = floor_registry.async_create("First floor")
assert len(floor_registry.floors) == 1
assert floor.floor_id == "first_floor"
assert floor.name == "First floor"
assert floor.icon is None
assert floor.aliases == set()
assert floor.level == 0
updated_floor = floor_registry.async_update(
floor.floor_id,
name="Second floor",
icon="mdi:home-floor-2",
aliases={"ground", "downstairs"},
level=2,
)
assert updated_floor != floor
assert updated_floor.floor_id == "first_floor"
assert updated_floor.name == "Second floor"
assert updated_floor.icon == "mdi:home-floor-2"
assert updated_floor.aliases == {"ground", "downstairs"}
assert updated_floor.level == 2
assert len(floor_registry.floors) == 1
await hass.async_block_till_done()
assert len(update_events) == 2
assert update_events[0].data == {
"action": "create",
"floor_id": floor.floor_id,
}
assert update_events[1].data == {
"action": "update",
"floor_id": floor.floor_id,
}
async def test_update_floor_with_same_data(
hass: HomeAssistant, floor_registry: fr.FloorRegistry
) -> None:
"""Make sure that we can reapply the same data to a floor and it won't update."""
update_events = async_capture_events(hass, EVENT_FLOOR_REGISTRY_UPDATED)
floor = floor_registry.async_create(
"First floor",
icon="mdi:home-floor-1",
)
updated_floor = floor_registry.async_update(
floor_id=floor.floor_id,
name="First floor",
icon="mdi:home-floor-1",
)
assert floor == updated_floor
await hass.async_block_till_done()
# No update event
assert len(update_events) == 1
assert update_events[0].data == {
"action": "create",
"floor_id": floor.floor_id,
}
async def test_update_floor_with_same_name_change_case(
floor_registry: fr.FloorRegistry,
) -> None:
"""Make sure that we can reapply the same name with a different case to a floor."""
floor = floor_registry.async_create("first floor")
updated_floor = floor_registry.async_update(floor.floor_id, name="First floor")
assert updated_floor.floor_id == floor.floor_id
assert updated_floor.name == "First floor"
assert updated_floor.normalized_name == floor.normalized_name
assert len(floor_registry.floors) == 1
async def test_update_floor_with_name_already_in_use(
floor_registry: fr.FloorRegistry,
) -> None:
"""Make sure that we can't update a floor with a name already in use."""
floor1 = floor_registry.async_create("First floor")
floor2 = floor_registry.async_create("Second floor")
with pytest.raises(
ValueError,
match=re.escape("The name Second floor (secondfloor) is already in use"),
):
floor_registry.async_update(floor1.floor_id, name="Second floor")
assert floor1.name == "First floor"
assert floor2.name == "Second floor"
assert len(floor_registry.floors) == 2
async def test_update_floor_with_normalized_name_already_in_use(
floor_registry: fr.FloorRegistry,
) -> None:
"""Make sure that we can't update a floor with a normalized name already in use."""
floor1 = floor_registry.async_create("first")
floor2 = floor_registry.async_create("S E C O N D")
with pytest.raises(
ValueError, match=re.escape("The name second (second) is already in use")
):
floor_registry.async_update(floor1.floor_id, name="second")
assert floor1.name == "first"
assert floor2.name == "S E C O N D"
assert len(floor_registry.floors) == 2
async def test_load_floors(
hass: HomeAssistant, floor_registry: fr.FloorRegistry
) -> None:
"""Make sure that we can load/save data correctly."""
floor1 = floor_registry.async_create(
"First floor",
icon="mdi:home-floor-1",
aliases={"first", "ground"},
level=1,
)
floor2 = floor_registry.async_create(
"Second floor",
icon="mdi:home-floor-2",
aliases={"first", "ground"},
level=2,
)
assert len(floor_registry.floors) == 2
registry2 = FloorRegistry(hass)
await flush_store(floor_registry._store)
await registry2.async_load()
assert len(registry2.floors) == 2
assert list(floor_registry.floors) == list(registry2.floors)
floor1_registry2 = registry2.async_get_floor_by_name("First floor")
assert floor1_registry2.floor_id == floor1.floor_id
assert floor1_registry2.name == floor1.name
assert floor1_registry2.icon == floor1.icon
assert floor1_registry2.aliases == floor1.aliases
assert floor1_registry2.level == floor1.level
assert floor1_registry2.normalized_name == floor1.normalized_name
floor2_registry2 = registry2.async_get_floor_by_name("Second floor")
assert floor2_registry2.floor_id == floor2.floor_id
assert floor2_registry2.name == floor2.name
assert floor2_registry2.icon == floor2.icon
assert floor2_registry2.aliases == floor2.aliases
assert floor2_registry2.level == floor2.level
assert floor2_registry2.normalized_name == floor2.normalized_name
@pytest.mark.parametrize("load_registries", [False])
async def test_loading_floors_from_storage(
hass: HomeAssistant, hass_storage: Any
) -> None:
"""Test loading stored floors on start."""
hass_storage[STORAGE_KEY] = {
"version": STORAGE_VERSION_MAJOR,
"data": {
"floors": [
{
"icon": "mdi:home-floor-1",
"floor_id": "first_floor",
"name": "First floor",
"aliases": ["first", "ground"],
"level": 1,
}
]
},
}
await async_load(hass)
registry = async_get(hass)
assert len(registry.floors) == 1
async def test_getting_floor(floor_registry: fr.FloorRegistry) -> None:
"""Make sure we can get the floors by name."""
floor = floor_registry.async_create("First floor")
floor2 = floor_registry.async_get_floor_by_name("first floor")
floor3 = floor_registry.async_get_floor_by_name("first floor")
assert floor == floor2
assert floor == floor3
assert floor2 == floor3
get_floor = floor_registry.async_get_floor(floor.floor_id)
assert get_floor == floor
async def test_async_get_floor_by_name_not_found(
floor_registry: fr.FloorRegistry,
) -> None:
"""Make sure we return None for non-existent floors."""
floor_registry.async_create("First floor")
assert len(floor_registry.floors) == 1
assert floor_registry.async_get_floor_by_name("non_exist") is None
async def test_floor_removed_from_areas(
hass: HomeAssistant,
area_registry: ar.AreaRegistry,
floor_registry: fr.FloorRegistry,
) -> None:
"""Test if floor gets removed from areas when the floor is removed."""
floor = floor_registry.async_create("First floor")
assert len(floor_registry.floors) == 1
entry = area_registry.async_create(name="Kitchen")
area_registry.async_update(entry.id, floor_id=floor.floor_id)
entries = ar.async_entries_for_floor(area_registry, floor.floor_id)
assert len(entries) == 1
floor_registry.async_delete(floor.floor_id)
await hass.async_block_till_done()
entries = ar.async_entries_for_floor(area_registry, floor.floor_id)
assert len(entries) == 0