Use storage based collections for input_number entities (#30576)
* Use collections for input_number. * Add tests. * Typo fix. * Make editable a public attribute. * Move async_setup to top. * Update homeassistant/components/input_number/__init__.py Co-Authored-By: Paulus Schoutsen <paulus@home-assistant.io> * Update homeassistant/components/input_number/__init__.py Co-Authored-By: Paulus Schoutsen <paulus@home-assistant.io> * Cleanup. * async_write_ha_state() Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
parent
c1c90b8034
commit
9c551ae85d
2 changed files with 383 additions and 54 deletions
|
@ -12,15 +12,57 @@ from homeassistant.components.input_number import (
|
|||
SERVICE_RELOAD,
|
||||
SERVICE_SET_VALUE,
|
||||
)
|
||||
from homeassistant.const import ATTR_ENTITY_ID
|
||||
from homeassistant.const import (
|
||||
ATTR_EDITABLE,
|
||||
ATTR_ENTITY_ID,
|
||||
ATTR_FRIENDLY_NAME,
|
||||
ATTR_NAME,
|
||||
)
|
||||
from homeassistant.core import Context, CoreState, State
|
||||
from homeassistant.exceptions import Unauthorized
|
||||
from homeassistant.helpers import entity_registry
|
||||
from homeassistant.loader import bind_hass
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.common import mock_restore_cache
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def storage_setup(hass, hass_storage):
|
||||
"""Storage setup."""
|
||||
|
||||
async def _storage(items=None, config=None):
|
||||
if items is None:
|
||||
hass_storage[DOMAIN] = {
|
||||
"key": DOMAIN,
|
||||
"version": 1,
|
||||
"data": {
|
||||
"items": [
|
||||
{
|
||||
"id": "from_storage",
|
||||
"initial": 10,
|
||||
"name": "from storage",
|
||||
"max": 100,
|
||||
"min": 0,
|
||||
"step": 1,
|
||||
"mode": "slider",
|
||||
}
|
||||
]
|
||||
},
|
||||
}
|
||||
else:
|
||||
hass_storage[DOMAIN] = {
|
||||
"key": DOMAIN,
|
||||
"version": 1,
|
||||
"data": {"items": items},
|
||||
}
|
||||
if config is None:
|
||||
config = {DOMAIN: {}}
|
||||
return await async_setup_component(hass, DOMAIN, config)
|
||||
|
||||
return _storage
|
||||
|
||||
|
||||
@bind_hass
|
||||
def set_value(hass, entity_id, value):
|
||||
"""Set input_number to value.
|
||||
|
@ -258,19 +300,33 @@ async def test_input_number_context(hass, hass_admin_user):
|
|||
async def test_reload(hass, hass_admin_user, hass_read_only_user):
|
||||
"""Test reload service."""
|
||||
count_start = len(hass.states.async_entity_ids())
|
||||
ent_reg = await entity_registry.async_get_registry(hass)
|
||||
|
||||
assert await async_setup_component(
|
||||
hass, DOMAIN, {DOMAIN: {"test_1": {"initial": 50, "min": 0, "max": 51}}}
|
||||
hass,
|
||||
DOMAIN,
|
||||
{
|
||||
DOMAIN: {
|
||||
"test_1": {"initial": 50, "min": 0, "max": 51},
|
||||
"test_3": {"initial": 10, "min": 0, "max": 15},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
assert count_start + 1 == len(hass.states.async_entity_ids())
|
||||
assert count_start + 2 == len(hass.states.async_entity_ids())
|
||||
|
||||
state_1 = hass.states.get("input_number.test_1")
|
||||
state_2 = hass.states.get("input_number.test_2")
|
||||
state_3 = hass.states.get("input_number.test_3")
|
||||
|
||||
assert state_1 is not None
|
||||
assert state_2 is None
|
||||
assert state_3 is not None
|
||||
assert 50 == float(state_1.state)
|
||||
assert 10 == float(state_3.state)
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, "test_1") is not None
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, "test_2") is None
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, "test_3") is not None
|
||||
|
||||
with patch(
|
||||
"homeassistant.config.load_yaml_config_file",
|
||||
|
@ -302,8 +358,189 @@ async def test_reload(hass, hass_admin_user, hass_read_only_user):
|
|||
|
||||
state_1 = hass.states.get("input_number.test_1")
|
||||
state_2 = hass.states.get("input_number.test_2")
|
||||
state_3 = hass.states.get("input_number.test_3")
|
||||
|
||||
assert state_1 is not None
|
||||
assert state_2 is not None
|
||||
assert 40 == float(state_1.state)
|
||||
assert state_3 is None
|
||||
assert 50 == float(state_1.state)
|
||||
assert 20 == float(state_2.state)
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, "test_1") is not None
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, "test_2") is not None
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, "test_3") is None
|
||||
|
||||
|
||||
async def test_load_from_storage(hass, storage_setup):
|
||||
"""Test set up from storage."""
|
||||
assert await storage_setup()
|
||||
state = hass.states.get(f"{DOMAIN}.from_storage")
|
||||
assert float(state.state) == 10
|
||||
assert state.attributes.get(ATTR_FRIENDLY_NAME) == "from storage"
|
||||
assert state.attributes.get(ATTR_EDITABLE)
|
||||
|
||||
|
||||
async def test_editable_state_attribute(hass, storage_setup):
|
||||
"""Test editable attribute."""
|
||||
assert await storage_setup(
|
||||
config={
|
||||
DOMAIN: {
|
||||
"from_yaml": {
|
||||
"min": 1,
|
||||
"max": 10,
|
||||
"initial": 5,
|
||||
"step": 1,
|
||||
"mode": "slider",
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
state = hass.states.get(f"{DOMAIN}.from_storage")
|
||||
assert float(state.state) == 10
|
||||
assert state.attributes.get(ATTR_FRIENDLY_NAME) == "from storage"
|
||||
assert state.attributes.get(ATTR_EDITABLE)
|
||||
|
||||
state = hass.states.get(f"{DOMAIN}.from_yaml")
|
||||
assert float(state.state) == 5
|
||||
assert not state.attributes.get(ATTR_EDITABLE)
|
||||
|
||||
|
||||
async def test_ws_list(hass, hass_ws_client, storage_setup):
|
||||
"""Test listing via WS."""
|
||||
assert await storage_setup(
|
||||
config={
|
||||
DOMAIN: {
|
||||
"from_yaml": {
|
||||
"min": 1,
|
||||
"max": 10,
|
||||
"initial": 5,
|
||||
"step": 1,
|
||||
"mode": "slider",
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
await client.send_json({"id": 6, "type": f"{DOMAIN}/list"})
|
||||
resp = await client.receive_json()
|
||||
assert resp["success"]
|
||||
|
||||
storage_ent = "from_storage"
|
||||
yaml_ent = "from_yaml"
|
||||
result = {item["id"]: item for item in resp["result"]}
|
||||
|
||||
assert len(result) == 1
|
||||
assert storage_ent in result
|
||||
assert yaml_ent not in result
|
||||
assert result[storage_ent][ATTR_NAME] == "from storage"
|
||||
|
||||
|
||||
async def test_ws_delete(hass, hass_ws_client, storage_setup):
|
||||
"""Test WS delete cleans up entity registry."""
|
||||
assert await storage_setup()
|
||||
|
||||
input_id = "from_storage"
|
||||
input_entity_id = f"{DOMAIN}.{input_id}"
|
||||
ent_reg = await entity_registry.async_get_registry(hass)
|
||||
|
||||
state = hass.states.get(input_entity_id)
|
||||
assert state is not None
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, input_id) is not None
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
await client.send_json(
|
||||
{"id": 6, "type": f"{DOMAIN}/delete", f"{DOMAIN}_id": f"{input_id}"}
|
||||
)
|
||||
resp = await client.receive_json()
|
||||
assert resp["success"]
|
||||
|
||||
state = hass.states.get(input_entity_id)
|
||||
assert state is None
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, input_id) is None
|
||||
|
||||
|
||||
async def test_update_min_max(hass, hass_ws_client, storage_setup):
|
||||
"""Test updating min/max updates the state."""
|
||||
|
||||
items = [
|
||||
{
|
||||
"id": "from_storage",
|
||||
"name": "from storage",
|
||||
"max": 100,
|
||||
"min": 0,
|
||||
"step": 1,
|
||||
"mode": "slider",
|
||||
}
|
||||
]
|
||||
assert await storage_setup(items)
|
||||
|
||||
input_id = "from_storage"
|
||||
input_entity_id = f"{DOMAIN}.{input_id}"
|
||||
ent_reg = await entity_registry.async_get_registry(hass)
|
||||
|
||||
state = hass.states.get(input_entity_id)
|
||||
assert state is not None
|
||||
assert state.state
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, input_id) is not None
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
await client.send_json(
|
||||
{"id": 6, "type": f"{DOMAIN}/update", f"{DOMAIN}_id": f"{input_id}", "min": 9}
|
||||
)
|
||||
resp = await client.receive_json()
|
||||
assert resp["success"]
|
||||
|
||||
state = hass.states.get(input_entity_id)
|
||||
assert float(state.state) == 9
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 7,
|
||||
"type": f"{DOMAIN}/update",
|
||||
f"{DOMAIN}_id": f"{input_id}",
|
||||
"max": 5,
|
||||
"min": 0,
|
||||
}
|
||||
)
|
||||
resp = await client.receive_json()
|
||||
assert resp["success"]
|
||||
|
||||
state = hass.states.get(input_entity_id)
|
||||
assert float(state.state) == 5
|
||||
|
||||
|
||||
async def test_ws_create(hass, hass_ws_client, storage_setup):
|
||||
"""Test create WS."""
|
||||
assert await storage_setup(items=[])
|
||||
|
||||
input_id = "new_input"
|
||||
input_entity_id = f"{DOMAIN}.{input_id}"
|
||||
ent_reg = await entity_registry.async_get_registry(hass)
|
||||
|
||||
state = hass.states.get(input_entity_id)
|
||||
assert state is None
|
||||
assert ent_reg.async_get_entity_id(DOMAIN, DOMAIN, input_id) is None
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 6,
|
||||
"type": f"{DOMAIN}/create",
|
||||
"name": "New Input",
|
||||
"max": 20,
|
||||
"min": 0,
|
||||
"initial": 10,
|
||||
"step": 1,
|
||||
"mode": "slider",
|
||||
}
|
||||
)
|
||||
resp = await client.receive_json()
|
||||
assert resp["success"]
|
||||
|
||||
state = hass.states.get(input_entity_id)
|
||||
assert float(state.state) == 10
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue