Add permissions foundation (#16890)
* Add permission foundation * Address comments * typing * False > True * Convert more lambdas * Use constants * Remove support for False * Fix only allow True
This commit is contained in:
parent
5961f2f577
commit
61f7a39748
7 changed files with 532 additions and 9 deletions
|
@ -10,6 +10,7 @@ from homeassistant.core import HomeAssistant, callback
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
|
|
||||||
from . import models
|
from . import models
|
||||||
|
from .permissions import DEFAULT_POLICY
|
||||||
|
|
||||||
STORAGE_VERSION = 1
|
STORAGE_VERSION = 1
|
||||||
STORAGE_KEY = 'auth'
|
STORAGE_KEY = 'auth'
|
||||||
|
@ -245,12 +246,16 @@ class AuthStore:
|
||||||
groups[group_dict['id']] = models.Group(
|
groups[group_dict['id']] = models.Group(
|
||||||
name=group_dict['name'],
|
name=group_dict['name'],
|
||||||
id=group_dict['id'],
|
id=group_dict['id'],
|
||||||
|
policy=group_dict.get('policy', DEFAULT_POLICY),
|
||||||
)
|
)
|
||||||
|
|
||||||
migrate_group = None
|
migrate_group = None
|
||||||
|
|
||||||
if not groups:
|
if not groups:
|
||||||
migrate_group = models.Group(name=INITIAL_GROUP_NAME)
|
migrate_group = models.Group(
|
||||||
|
name=INITIAL_GROUP_NAME,
|
||||||
|
policy=DEFAULT_POLICY
|
||||||
|
)
|
||||||
groups[migrate_group.id] = migrate_group
|
groups[migrate_group.id] = migrate_group
|
||||||
|
|
||||||
for user_dict in data['users']:
|
for user_dict in data['users']:
|
||||||
|
@ -348,13 +353,17 @@ class AuthStore:
|
||||||
for user in self._users.values()
|
for user in self._users.values()
|
||||||
]
|
]
|
||||||
|
|
||||||
groups = [
|
groups = []
|
||||||
{
|
for group in self._groups.values():
|
||||||
|
g_dict = {
|
||||||
'name': group.name,
|
'name': group.name,
|
||||||
'id': group.id,
|
'id': group.id,
|
||||||
}
|
} # type: Dict[str, Any]
|
||||||
for group in self._groups.values()
|
|
||||||
]
|
if group.policy is not DEFAULT_POLICY:
|
||||||
|
g_dict['policy'] = group.policy
|
||||||
|
|
||||||
|
groups.append(g_dict)
|
||||||
|
|
||||||
credentials = [
|
credentials = [
|
||||||
{
|
{
|
||||||
|
@ -402,7 +411,10 @@ class AuthStore:
|
||||||
self._users = OrderedDict() # type: Dict[str, models.User]
|
self._users = OrderedDict() # type: Dict[str, models.User]
|
||||||
|
|
||||||
# Add default group
|
# Add default group
|
||||||
all_access_group = models.Group(name=INITIAL_GROUP_NAME)
|
all_access_group = models.Group(
|
||||||
|
name=INITIAL_GROUP_NAME,
|
||||||
|
policy=DEFAULT_POLICY,
|
||||||
|
)
|
||||||
|
|
||||||
groups = OrderedDict() # type: Dict[str, models.Group]
|
groups = OrderedDict() # type: Dict[str, models.Group]
|
||||||
groups[all_access_group.id] = all_access_group
|
groups[all_access_group.id] = all_access_group
|
||||||
|
|
|
@ -7,6 +7,7 @@ import attr
|
||||||
|
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
|
|
||||||
|
from . import permissions as perm_mdl
|
||||||
from .util import generate_secret
|
from .util import generate_secret
|
||||||
|
|
||||||
TOKEN_TYPE_NORMAL = 'normal'
|
TOKEN_TYPE_NORMAL = 'normal'
|
||||||
|
@ -19,6 +20,7 @@ class Group:
|
||||||
"""A group."""
|
"""A group."""
|
||||||
|
|
||||||
name = attr.ib(type=str) # type: Optional[str]
|
name = attr.ib(type=str) # type: Optional[str]
|
||||||
|
policy = attr.ib(type=perm_mdl.PolicyType)
|
||||||
id = attr.ib(type=str, factory=lambda: uuid.uuid4().hex)
|
id = attr.ib(type=str, factory=lambda: uuid.uuid4().hex)
|
||||||
|
|
||||||
|
|
||||||
|
@ -44,6 +46,28 @@ class User:
|
||||||
type=dict, factory=dict, cmp=False
|
type=dict, factory=dict, cmp=False
|
||||||
) # type: Dict[str, RefreshToken]
|
) # type: Dict[str, RefreshToken]
|
||||||
|
|
||||||
|
_permissions = attr.ib(
|
||||||
|
type=perm_mdl.PolicyPermissions,
|
||||||
|
init=False,
|
||||||
|
cmp=False,
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def permissions(self) -> perm_mdl.AbstractPermissions:
|
||||||
|
"""Return permissions object for user."""
|
||||||
|
if self.is_owner:
|
||||||
|
return perm_mdl.OwnerPermissions
|
||||||
|
|
||||||
|
if self._permissions is not None:
|
||||||
|
return self._permissions
|
||||||
|
|
||||||
|
self._permissions = perm_mdl.PolicyPermissions(
|
||||||
|
perm_mdl.merge_policies([
|
||||||
|
group.policy for group in self.groups]))
|
||||||
|
|
||||||
|
return self._permissions
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True)
|
@attr.s(slots=True)
|
||||||
class RefreshToken:
|
class RefreshToken:
|
||||||
|
|
252
homeassistant/auth/permissions.py
Normal file
252
homeassistant/auth/permissions.py
Normal file
|
@ -0,0 +1,252 @@
|
||||||
|
"""Permissions for Home Assistant."""
|
||||||
|
from typing import ( # noqa: F401
|
||||||
|
cast, Any, Callable, Dict, List, Mapping, Set, Tuple, Union)
|
||||||
|
|
||||||
|
import voluptuous as vol
|
||||||
|
|
||||||
|
from homeassistant.core import State
|
||||||
|
|
||||||
|
CategoryType = Union[Mapping[str, 'CategoryType'], bool, None]
|
||||||
|
PolicyType = Mapping[str, CategoryType]
|
||||||
|
|
||||||
|
|
||||||
|
# Default policy if group has no policy applied.
|
||||||
|
DEFAULT_POLICY = {
|
||||||
|
"entities": True
|
||||||
|
} # type: PolicyType
|
||||||
|
|
||||||
|
CAT_ENTITIES = 'entities'
|
||||||
|
ENTITY_DOMAINS = 'domains'
|
||||||
|
ENTITY_ENTITY_IDS = 'entity_ids'
|
||||||
|
|
||||||
|
VALUES_SCHEMA = vol.Any(True, vol.Schema({
|
||||||
|
str: True
|
||||||
|
}))
|
||||||
|
|
||||||
|
ENTITY_POLICY_SCHEMA = vol.Any(True, vol.Schema({
|
||||||
|
vol.Optional(ENTITY_DOMAINS): VALUES_SCHEMA,
|
||||||
|
vol.Optional(ENTITY_ENTITY_IDS): VALUES_SCHEMA,
|
||||||
|
}))
|
||||||
|
|
||||||
|
POLICY_SCHEMA = vol.Schema({
|
||||||
|
vol.Optional(CAT_ENTITIES): ENTITY_POLICY_SCHEMA
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class AbstractPermissions:
|
||||||
|
"""Default permissions class."""
|
||||||
|
|
||||||
|
def check_entity(self, entity_id: str, *keys: str) -> bool:
|
||||||
|
"""Test if we can access entity."""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def filter_states(self, states: List[State]) -> List[State]:
|
||||||
|
"""Filter a list of states for what the user is allowed to see."""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class PolicyPermissions(AbstractPermissions):
|
||||||
|
"""Handle permissions."""
|
||||||
|
|
||||||
|
def __init__(self, policy: PolicyType) -> None:
|
||||||
|
"""Initialize the permission class."""
|
||||||
|
self._policy = policy
|
||||||
|
self._compiled = {} # type: Dict[str, Callable[..., bool]]
|
||||||
|
|
||||||
|
def check_entity(self, entity_id: str, *keys: str) -> bool:
|
||||||
|
"""Test if we can access entity."""
|
||||||
|
func = self._policy_func(CAT_ENTITIES, _compile_entities)
|
||||||
|
return func(entity_id, keys)
|
||||||
|
|
||||||
|
def filter_states(self, states: List[State]) -> List[State]:
|
||||||
|
"""Filter a list of states for what the user is allowed to see."""
|
||||||
|
func = self._policy_func(CAT_ENTITIES, _compile_entities)
|
||||||
|
keys = ('read',)
|
||||||
|
return [entity for entity in states if func(entity.entity_id, keys)]
|
||||||
|
|
||||||
|
def _policy_func(self, category: str,
|
||||||
|
compile_func: Callable[[CategoryType], Callable]) \
|
||||||
|
-> Callable[..., bool]:
|
||||||
|
"""Get a policy function."""
|
||||||
|
func = self._compiled.get(category)
|
||||||
|
|
||||||
|
if func:
|
||||||
|
return func
|
||||||
|
|
||||||
|
func = self._compiled[category] = compile_func(
|
||||||
|
self._policy.get(category))
|
||||||
|
return func
|
||||||
|
|
||||||
|
def __eq__(self, other: Any) -> bool:
|
||||||
|
"""Equals check."""
|
||||||
|
# pylint: disable=protected-access
|
||||||
|
return (isinstance(other, PolicyPermissions) and
|
||||||
|
other._policy == self._policy)
|
||||||
|
|
||||||
|
|
||||||
|
class _OwnerPermissions(AbstractPermissions):
|
||||||
|
"""Owner permissions."""
|
||||||
|
|
||||||
|
# pylint: disable=no-self-use
|
||||||
|
|
||||||
|
def check_entity(self, entity_id: str, *keys: str) -> bool:
|
||||||
|
"""Test if we can access entity."""
|
||||||
|
return True
|
||||||
|
|
||||||
|
def filter_states(self, states: List[State]) -> List[State]:
|
||||||
|
"""Filter a list of states for what the user is allowed to see."""
|
||||||
|
return states
|
||||||
|
|
||||||
|
|
||||||
|
OwnerPermissions = _OwnerPermissions() # pylint: disable=invalid-name
|
||||||
|
|
||||||
|
|
||||||
|
def _compile_entities(policy: CategoryType) \
|
||||||
|
-> Callable[[str, Tuple[str]], bool]:
|
||||||
|
"""Compile policy into a function that tests policy."""
|
||||||
|
# None, Empty Dict, False
|
||||||
|
if not policy:
|
||||||
|
def apply_policy_deny_all(entity_id: str, keys: Tuple[str]) -> bool:
|
||||||
|
"""Decline all."""
|
||||||
|
return False
|
||||||
|
|
||||||
|
return apply_policy_deny_all
|
||||||
|
|
||||||
|
if policy is True:
|
||||||
|
def apply_policy_allow_all(entity_id: str, keys: Tuple[str]) -> bool:
|
||||||
|
"""Approve all."""
|
||||||
|
return True
|
||||||
|
|
||||||
|
return apply_policy_allow_all
|
||||||
|
|
||||||
|
assert isinstance(policy, dict)
|
||||||
|
|
||||||
|
domains = policy.get(ENTITY_DOMAINS)
|
||||||
|
entity_ids = policy.get(ENTITY_ENTITY_IDS)
|
||||||
|
|
||||||
|
funcs = [] # type: List[Callable[[str, Tuple[str]], Union[None, bool]]]
|
||||||
|
|
||||||
|
# The order of these functions matter. The more precise are at the top.
|
||||||
|
# If a function returns None, they cannot handle it.
|
||||||
|
# If a function returns a boolean, that's the result to return.
|
||||||
|
|
||||||
|
# Setting entity_ids to a boolean is final decision for permissions
|
||||||
|
# So return right away.
|
||||||
|
if isinstance(entity_ids, bool):
|
||||||
|
def apply_entity_id_policy(entity_id: str, keys: Tuple[str]) -> bool:
|
||||||
|
"""Test if allowed entity_id."""
|
||||||
|
return entity_ids # type: ignore
|
||||||
|
|
||||||
|
return apply_entity_id_policy
|
||||||
|
|
||||||
|
if entity_ids is not None:
|
||||||
|
def allowed_entity_id(entity_id: str, keys: Tuple[str]) \
|
||||||
|
-> Union[None, bool]:
|
||||||
|
"""Test if allowed entity_id."""
|
||||||
|
return entity_ids.get(entity_id) # type: ignore
|
||||||
|
|
||||||
|
funcs.append(allowed_entity_id)
|
||||||
|
|
||||||
|
if isinstance(domains, bool):
|
||||||
|
def allowed_domain(entity_id: str, keys: Tuple[str]) \
|
||||||
|
-> Union[None, bool]:
|
||||||
|
"""Test if allowed domain."""
|
||||||
|
return domains
|
||||||
|
|
||||||
|
funcs.append(allowed_domain)
|
||||||
|
|
||||||
|
elif domains is not None:
|
||||||
|
def allowed_domain(entity_id: str, keys: Tuple[str]) \
|
||||||
|
-> Union[None, bool]:
|
||||||
|
"""Test if allowed domain."""
|
||||||
|
domain = entity_id.split(".", 1)[0]
|
||||||
|
return domains.get(domain) # type: ignore
|
||||||
|
|
||||||
|
funcs.append(allowed_domain)
|
||||||
|
|
||||||
|
# Can happen if no valid subcategories specified
|
||||||
|
if not funcs:
|
||||||
|
def apply_policy_deny_all_2(entity_id: str, keys: Tuple[str]) -> bool:
|
||||||
|
"""Decline all."""
|
||||||
|
return False
|
||||||
|
|
||||||
|
return apply_policy_deny_all_2
|
||||||
|
|
||||||
|
if len(funcs) == 1:
|
||||||
|
func = funcs[0]
|
||||||
|
|
||||||
|
def apply_policy_func(entity_id: str, keys: Tuple[str]) -> bool:
|
||||||
|
"""Apply a single policy function."""
|
||||||
|
return func(entity_id, keys) is True
|
||||||
|
|
||||||
|
return apply_policy_func
|
||||||
|
|
||||||
|
def apply_policy_funcs(entity_id: str, keys: Tuple[str]) -> bool:
|
||||||
|
"""Apply several policy functions."""
|
||||||
|
for func in funcs:
|
||||||
|
result = func(entity_id, keys)
|
||||||
|
if result is not None:
|
||||||
|
return result
|
||||||
|
return False
|
||||||
|
|
||||||
|
return apply_policy_funcs
|
||||||
|
|
||||||
|
|
||||||
|
def merge_policies(policies: List[PolicyType]) -> PolicyType:
|
||||||
|
"""Merge policies."""
|
||||||
|
new_policy = {} # type: Dict[str, CategoryType]
|
||||||
|
seen = set() # type: Set[str]
|
||||||
|
for policy in policies:
|
||||||
|
for category in policy:
|
||||||
|
if category in seen:
|
||||||
|
continue
|
||||||
|
seen.add(category)
|
||||||
|
new_policy[category] = _merge_policies([
|
||||||
|
policy.get(category) for policy in policies])
|
||||||
|
cast(PolicyType, new_policy)
|
||||||
|
return new_policy
|
||||||
|
|
||||||
|
|
||||||
|
def _merge_policies(sources: List[CategoryType]) -> CategoryType:
|
||||||
|
"""Merge a policy."""
|
||||||
|
# When merging policies, the most permissive wins.
|
||||||
|
# This means we order it like this:
|
||||||
|
# True > Dict > None
|
||||||
|
#
|
||||||
|
# True: allow everything
|
||||||
|
# Dict: specify more granular permissions
|
||||||
|
# None: no opinion
|
||||||
|
#
|
||||||
|
# If there are multiple sources with a dict as policy, we recursively
|
||||||
|
# merge each key in the source.
|
||||||
|
|
||||||
|
policy = None # type: CategoryType
|
||||||
|
seen = set() # type: Set[str]
|
||||||
|
for source in sources:
|
||||||
|
if source is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# A source that's True will always win. Shortcut return.
|
||||||
|
if source is True:
|
||||||
|
return True
|
||||||
|
|
||||||
|
assert isinstance(source, dict)
|
||||||
|
|
||||||
|
if policy is None:
|
||||||
|
policy = {}
|
||||||
|
|
||||||
|
assert isinstance(policy, dict)
|
||||||
|
|
||||||
|
for key in source:
|
||||||
|
if key in seen:
|
||||||
|
continue
|
||||||
|
seen.add(key)
|
||||||
|
|
||||||
|
key_sources = []
|
||||||
|
for src in sources:
|
||||||
|
if isinstance(src, dict):
|
||||||
|
key_sources.append(src.get(key))
|
||||||
|
|
||||||
|
policy[key] = _merge_policies(key_sources)
|
||||||
|
|
||||||
|
return policy
|
|
@ -302,6 +302,7 @@ async def test_saving_loading(hass, hass_storage):
|
||||||
store2 = auth_store.AuthStore(hass)
|
store2 = auth_store.AuthStore(hass)
|
||||||
users = await store2.async_get_users()
|
users = await store2.async_get_users()
|
||||||
assert len(users) == 1
|
assert len(users) == 1
|
||||||
|
assert users[0].permissions == user.permissions
|
||||||
assert users[0] == user
|
assert users[0] == user
|
||||||
assert len(users[0].refresh_tokens) == 2
|
assert len(users[0].refresh_tokens) == 2
|
||||||
for r_token in users[0].refresh_tokens.values():
|
for r_token in users[0].refresh_tokens.values():
|
||||||
|
|
34
tests/auth/test_models.py
Normal file
34
tests/auth/test_models.py
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
"""Tests for the auth models."""
|
||||||
|
from homeassistant.auth import models, permissions
|
||||||
|
|
||||||
|
|
||||||
|
def test_owner_fetching_owner_permissions():
|
||||||
|
"""Test we fetch the owner permissions for an owner user."""
|
||||||
|
group = models.Group(name="Test Group", policy={})
|
||||||
|
owner = models.User(name="Test User", groups=[group], is_owner=True)
|
||||||
|
assert owner.permissions is permissions.OwnerPermissions
|
||||||
|
|
||||||
|
|
||||||
|
def test_permissions_merged():
|
||||||
|
"""Test we merge the groups permissions."""
|
||||||
|
group = models.Group(name="Test Group", policy={
|
||||||
|
'entities': {
|
||||||
|
'domains': {
|
||||||
|
'switch': True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
group2 = models.Group(name="Test Group", policy={
|
||||||
|
'entities': {
|
||||||
|
'entity_ids': {
|
||||||
|
'light.kitchen': True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
user = models.User(name="Test User", groups=[group, group2])
|
||||||
|
# Make sure we cache instance
|
||||||
|
assert user.permissions is user.permissions
|
||||||
|
|
||||||
|
assert user.permissions.check_entity('switch.bla') is True
|
||||||
|
assert user.permissions.check_entity('light.kitchen') is True
|
||||||
|
assert user.permissions.check_entity('light.not_kitchen') is False
|
198
tests/auth/test_permissions.py
Normal file
198
tests/auth/test_permissions.py
Normal file
|
@ -0,0 +1,198 @@
|
||||||
|
"""Tests for the auth permission system."""
|
||||||
|
import pytest
|
||||||
|
import voluptuous as vol
|
||||||
|
|
||||||
|
from homeassistant.core import State
|
||||||
|
from homeassistant.auth import permissions
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_none():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = None
|
||||||
|
compiled = permissions._compile_entities(policy)
|
||||||
|
assert compiled('light.kitchen', []) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_empty():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = {}
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
compiled = permissions._compile_entities(policy)
|
||||||
|
assert compiled('light.kitchen', []) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_false():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = False
|
||||||
|
with pytest.raises(vol.Invalid):
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_true():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = True
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
compiled = permissions._compile_entities(policy)
|
||||||
|
assert compiled('light.kitchen', []) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_domains_true():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = {
|
||||||
|
'domains': True
|
||||||
|
}
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
compiled = permissions._compile_entities(policy)
|
||||||
|
assert compiled('light.kitchen', []) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_domains_domain_true():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = {
|
||||||
|
'domains': {
|
||||||
|
'light': True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
compiled = permissions._compile_entities(policy)
|
||||||
|
assert compiled('light.kitchen', []) is True
|
||||||
|
assert compiled('switch.kitchen', []) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_domains_domain_false():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = {
|
||||||
|
'domains': {
|
||||||
|
'light': False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
with pytest.raises(vol.Invalid):
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_entity_ids_true():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = {
|
||||||
|
'entity_ids': True
|
||||||
|
}
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
compiled = permissions._compile_entities(policy)
|
||||||
|
assert compiled('light.kitchen', []) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_entity_ids_false():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = {
|
||||||
|
'entity_ids': False
|
||||||
|
}
|
||||||
|
with pytest.raises(vol.Invalid):
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_entity_ids_entity_id_true():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = {
|
||||||
|
'entity_ids': {
|
||||||
|
'light.kitchen': True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
compiled = permissions._compile_entities(policy)
|
||||||
|
assert compiled('light.kitchen', []) is True
|
||||||
|
assert compiled('switch.kitchen', []) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_entities_entity_ids_entity_id_false():
|
||||||
|
"""Test entity ID policy."""
|
||||||
|
policy = {
|
||||||
|
'entity_ids': {
|
||||||
|
'light.kitchen': False
|
||||||
|
}
|
||||||
|
}
|
||||||
|
with pytest.raises(vol.Invalid):
|
||||||
|
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||||
|
|
||||||
|
|
||||||
|
def test_policy_perm_filter_states():
|
||||||
|
"""Test filtering entitites."""
|
||||||
|
states = [
|
||||||
|
State('light.kitchen', 'on'),
|
||||||
|
State('light.living_room', 'off'),
|
||||||
|
State('light.balcony', 'on'),
|
||||||
|
]
|
||||||
|
perm = permissions.PolicyPermissions({
|
||||||
|
'entities': {
|
||||||
|
'entity_ids': {
|
||||||
|
'light.kitchen': True,
|
||||||
|
'light.balcony': True,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
filtered = perm.filter_states(states)
|
||||||
|
assert len(filtered) == 2
|
||||||
|
assert filtered == [states[0], states[2]]
|
||||||
|
|
||||||
|
|
||||||
|
def test_owner_permissions():
|
||||||
|
"""Test owner permissions access all."""
|
||||||
|
assert permissions.OwnerPermissions.check_entity('light.kitchen')
|
||||||
|
states = [
|
||||||
|
State('light.kitchen', 'on'),
|
||||||
|
State('light.living_room', 'off'),
|
||||||
|
State('light.balcony', 'on'),
|
||||||
|
]
|
||||||
|
assert permissions.OwnerPermissions.filter_states(states) == states
|
||||||
|
|
||||||
|
|
||||||
|
def test_default_policy_allow_all():
|
||||||
|
"""Test that the default policy is to allow all entity actions."""
|
||||||
|
perm = permissions.PolicyPermissions(permissions.DEFAULT_POLICY)
|
||||||
|
assert perm.check_entity('light.kitchen')
|
||||||
|
states = [
|
||||||
|
State('light.kitchen', 'on'),
|
||||||
|
State('light.living_room', 'off'),
|
||||||
|
State('light.balcony', 'on'),
|
||||||
|
]
|
||||||
|
assert perm.filter_states(states) == states
|
||||||
|
|
||||||
|
|
||||||
|
def test_merging_permissions_true_rules_dict():
|
||||||
|
"""Test merging policy with two entities."""
|
||||||
|
policy1 = {
|
||||||
|
'something_else': True,
|
||||||
|
'entities': {
|
||||||
|
'entity_ids': {
|
||||||
|
'light.kitchen': True,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
policy2 = {
|
||||||
|
'entities': {
|
||||||
|
'entity_ids': True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert permissions.merge_policies([policy1, policy2]) == {
|
||||||
|
'something_else': True,
|
||||||
|
'entities': {
|
||||||
|
'entity_ids': True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_merging_permissions_multiple_subcategories():
|
||||||
|
"""Test merging policy with two entities."""
|
||||||
|
policy1 = {
|
||||||
|
'entities': None
|
||||||
|
}
|
||||||
|
policy2 = {
|
||||||
|
'entities': {
|
||||||
|
'entity_ids': True,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
policy3 = {
|
||||||
|
'entities': True
|
||||||
|
}
|
||||||
|
assert permissions.merge_policies([policy1, policy2]) == policy2
|
||||||
|
assert permissions.merge_policies([policy1, policy3]) == policy3
|
||||||
|
|
||||||
|
assert permissions.merge_policies([policy2, policy3]) == policy3
|
|
@ -348,10 +348,12 @@ def mock_device_registry(hass, mock_entries=None):
|
||||||
class MockGroup(auth_models.Group):
|
class MockGroup(auth_models.Group):
|
||||||
"""Mock a group in Home Assistant."""
|
"""Mock a group in Home Assistant."""
|
||||||
|
|
||||||
def __init__(self, id=None, name='Mock Group'):
|
def __init__(self, id=None, name='Mock Group',
|
||||||
|
policy=auth_store.DEFAULT_POLICY):
|
||||||
"""Mock a group."""
|
"""Mock a group."""
|
||||||
kwargs = {
|
kwargs = {
|
||||||
'name': name
|
'name': name,
|
||||||
|
'policy': policy,
|
||||||
}
|
}
|
||||||
if id is not None:
|
if id is not None:
|
||||||
kwargs['id'] = id
|
kwargs['id'] = id
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue