Add entity category support to HomeKit (#64492)

This commit is contained in:
J. Nick Koston 2022-01-19 21:48:50 -10:00 committed by GitHub
parent e248ef1dd7
commit d53124910f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 447 additions and 152 deletions

View file

@ -6,7 +6,7 @@ from copy import deepcopy
import random
import re
import string
from typing import Final
from typing import Any, Final
import voluptuous as vol
@ -25,9 +25,10 @@ from homeassistant.const import (
CONF_ENTITY_ID,
CONF_NAME,
CONF_PORT,
ENTITY_CATEGORIES,
)
from homeassistant.core import HomeAssistant, callback, split_entity_id
from homeassistant.helpers import device_registry
from homeassistant.helpers import device_registry, entity_registry
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entityfilter import (
CONF_EXCLUDE_DOMAINS,
@ -127,6 +128,38 @@ _EMPTY_ENTITY_FILTER: Final = {
}
async def _async_domain_names(hass: HomeAssistant, domains: list[str]) -> str:
"""Build a list of integration names from domains."""
name_to_type_map = await _async_name_to_type_map(hass)
return ", ".join(
[name for domain, name in name_to_type_map.items() if domain in domains]
)
@callback
def _async_build_entites_filter(
domains: list[str], entities: list[str]
) -> dict[str, Any]:
"""Build an entities filter from domains and entities."""
entity_filter = deepcopy(_EMPTY_ENTITY_FILTER)
entity_filter[CONF_INCLUDE_ENTITIES] = entities
# Include all of the domain if there are no entities
# explicitly included as the user selected the domain
domains_with_entities_selected = _domains_set_from_entities(entities)
entity_filter[CONF_INCLUDE_DOMAINS] = [
domain for domain in domains if domain not in domains_with_entities_selected
]
return entity_filter
def _async_cameras_from_entities(entities: list[str]) -> set[str]:
return {
entity_id
for entity_id in entities
if entity_id.startswith(CAMERA_ENTITY_PREFIX)
}
async def _async_name_to_type_map(hass: HomeAssistant) -> dict[str, str]:
"""Create a mapping of types of devices/entities HomeKit can support."""
integrations = await asyncio.gather(
@ -331,6 +364,9 @@ class OptionsFlowHandler(config_entries.OptionsFlow):
):
self.hk_options[CONF_DEVICES] = user_input[CONF_DEVICES]
if CONF_INCLUDE_EXCLUDE_MODE in self.hk_options:
del self.hk_options[CONF_INCLUDE_EXCLUDE_MODE]
return self.async_create_entry(title="", data=self.hk_options)
all_supported_devices = await _async_get_supported_devices(self.hass)
@ -398,99 +434,139 @@ class OptionsFlowHandler(config_entries.OptionsFlow):
)
return self.async_show_form(step_id="cameras", data_schema=data_schema)
async def async_step_include_exclude(self, user_input=None):
"""Choose entities to include or exclude from the domain."""
async def async_step_accessory(self, user_input=None):
"""Choose entity for the accessory."""
domains = self.hk_options[CONF_DOMAINS]
if user_input is not None:
entity_filter = _EMPTY_ENTITY_FILTER.copy()
if isinstance(user_input[CONF_ENTITIES], list):
entities = user_input[CONF_ENTITIES]
else:
entities = [user_input[CONF_ENTITIES]]
if (
self.hk_options[CONF_HOMEKIT_MODE] == HOMEKIT_MODE_ACCESSORY
or user_input[CONF_INCLUDE_EXCLUDE_MODE] == MODE_INCLUDE
):
entity_filter[CONF_INCLUDE_ENTITIES] = entities
# Include all of the domain if there are no entities
# explicitly included as the user selected the domain
domains_with_entities_selected = _domains_set_from_entities(entities)
entity_filter[CONF_INCLUDE_DOMAINS] = [
domain
for domain in self.hk_options[CONF_DOMAINS]
if domain not in domains_with_entities_selected
]
self.included_cameras = {
entity_id
for entity_id in entities
if entity_id.startswith(CAMERA_ENTITY_PREFIX)
}
else:
entity_filter[CONF_INCLUDE_DOMAINS] = self.hk_options[CONF_DOMAINS]
entity_filter[CONF_EXCLUDE_ENTITIES] = entities
if CAMERA_DOMAIN in entity_filter[CONF_INCLUDE_DOMAINS]:
camera_entities = _async_get_matching_entities(
self.hass,
domains=[CAMERA_DOMAIN],
)
self.included_cameras = {
entity_id
for entity_id in camera_entities
if entity_id not in entities
}
else:
self.included_cameras = set()
entities = cv.ensure_list(user_input[CONF_ENTITIES])
entity_filter = _async_build_entites_filter(domains, entities)
self.included_cameras = _async_cameras_from_entities(entities)
self.hk_options[CONF_FILTER] = entity_filter
if self.included_cameras:
return await self.async_step_cameras()
return await self.async_step_advanced()
entity_filter = self.hk_options.get(CONF_FILTER, {})
entities = entity_filter.get(CONF_INCLUDE_ENTITIES, [])
all_supported_entities = _async_get_matching_entities(self.hass, domains)
# In accessory mode we can only have one
default_value = next(
iter(
entity_id
for entity_id in entities
if entity_id in all_supported_entities
),
None,
)
return self.async_show_form(
step_id="accessory",
data_schema=vol.Schema(
{
vol.Required(CONF_ENTITIES, default=default_value): vol.In(
all_supported_entities
)
}
),
)
async def async_step_include(self, user_input=None):
"""Choose entities to include from the domain on the bridge."""
domains = self.hk_options[CONF_DOMAINS]
if user_input is not None:
entities = cv.ensure_list(user_input[CONF_ENTITIES])
entity_filter = _async_build_entites_filter(domains, entities)
self.included_cameras = _async_cameras_from_entities(entities)
self.hk_options[CONF_FILTER] = entity_filter
if self.included_cameras:
return await self.async_step_cameras()
return await self.async_step_advanced()
entity_filter = self.hk_options.get(CONF_FILTER, {})
entities = entity_filter.get(CONF_INCLUDE_ENTITIES, [])
all_supported_entities = _async_get_matching_entities(
self.hass,
domains=self.hk_options[CONF_DOMAINS],
)
data_schema = {}
if self.hk_options[CONF_HOMEKIT_MODE] == HOMEKIT_MODE_ACCESSORY:
# In accessory mode we can only have one
default_value = next(
iter(
entity_id
for entity_id in entities
if entity_id in all_supported_entities
),
None,
)
entity_schema = vol.In
entities_schema_required = vol.Required
else:
# Bridge mode
entities_schema_required = vol.Optional
include_exclude_mode = MODE_INCLUDE
if not entities:
include_exclude_mode = MODE_EXCLUDE
entities = entity_filter.get(CONF_EXCLUDE_ENTITIES, [])
data_schema[
vol.Required(CONF_INCLUDE_EXCLUDE_MODE, default=include_exclude_mode)
] = vol.In(INCLUDE_EXCLUDE_MODES)
entity_schema = cv.multi_select
# Strip out entities that no longer exist to prevent error in the UI
default_value = [
entity_id
for entity_id in entities
if entity_id in all_supported_entities
]
all_supported_entities = _async_get_matching_entities(self.hass, domains)
if not entities:
entities = entity_filter.get(CONF_EXCLUDE_ENTITIES, [])
# Strip out entities that no longer exist to prevent error in the UI
default_value = [
entity_id for entity_id in entities if entity_id in all_supported_entities
]
data_schema[
entities_schema_required(CONF_ENTITIES, default=default_value)
] = entity_schema(all_supported_entities)
return self.async_show_form(
step_id="include_exclude", data_schema=vol.Schema(data_schema)
step_id="include",
description_placeholders={
"domains": await _async_domain_names(self.hass, domains)
},
data_schema=vol.Schema(
{
vol.Optional(CONF_ENTITIES, default=default_value): cv.multi_select(
all_supported_entities
)
}
),
)
async def async_step_exclude(self, user_input=None):
"""Choose entities to exclude from the domain on the bridge."""
domains = self.hk_options[CONF_DOMAINS]
if user_input is not None:
entity_filter = deepcopy(_EMPTY_ENTITY_FILTER)
entities = cv.ensure_list(user_input[CONF_ENTITIES])
entity_filter[CONF_INCLUDE_DOMAINS] = domains
entity_filter[CONF_EXCLUDE_ENTITIES] = entities
self.included_cameras = set()
if CAMERA_DOMAIN in entity_filter[CONF_INCLUDE_DOMAINS]:
camera_entities = _async_get_matching_entities(
self.hass, [CAMERA_DOMAIN]
)
self.included_cameras = {
entity_id
for entity_id in camera_entities
if entity_id not in entities
}
self.hk_options[CONF_FILTER] = entity_filter
if self.included_cameras:
return await self.async_step_cameras()
return await self.async_step_advanced()
entity_filter = self.hk_options.get(CONF_FILTER, {})
entities = entity_filter.get(CONF_INCLUDE_ENTITIES, [])
all_supported_entities = _async_get_matching_entities(self.hass, domains)
if not entities:
entities = entity_filter.get(CONF_EXCLUDE_ENTITIES, [])
ent_reg = entity_registry.async_get(self.hass)
entity_cat_entities = set()
for entity_id in all_supported_entities:
if ent_reg_ent := ent_reg.async_get(entity_id):
if ent_reg_ent.entity_category in ENTITY_CATEGORIES:
entity_cat_entities.add(entity_id)
# Remove entity category entities since we will exclude them anyways
all_supported_entities = {
k: v
for k, v in all_supported_entities.items()
if k not in entity_cat_entities
}
# Strip out entities that no longer exist to prevent error in the UI
default_value = [
entity_id for entity_id in entities if entity_id in all_supported_entities
]
return self.async_show_form(
step_id="exclude",
description_placeholders={
"domains": await _async_domain_names(self.hass, domains)
},
data_schema=vol.Schema(
{
vol.Optional(CONF_ENTITIES, default=default_value): cv.multi_select(
all_supported_entities
)
}
),
)
async def async_step_init(self, user_input=None):
@ -500,17 +576,24 @@ class OptionsFlowHandler(config_entries.OptionsFlow):
if user_input is not None:
self.hk_options.update(user_input)
return await self.async_step_include_exclude()
if self.hk_options.get(CONF_HOMEKIT_MODE) == HOMEKIT_MODE_ACCESSORY:
return await self.async_step_accessory()
if user_input[CONF_INCLUDE_EXCLUDE_MODE] == MODE_INCLUDE:
return await self.async_step_include()
return await self.async_step_exclude()
self.hk_options = deepcopy(dict(self.config_entry.options))
entity_filter = self.hk_options.get(CONF_FILTER, {})
homekit_mode = self.hk_options.get(CONF_HOMEKIT_MODE, DEFAULT_HOMEKIT_MODE)
entity_filter = self.hk_options.get(CONF_FILTER, {})
include_exclude_mode = MODE_INCLUDE
entities = entity_filter.get(CONF_INCLUDE_ENTITIES, [])
if homekit_mode != HOMEKIT_MODE_ACCESSORY:
include_exclude_mode = MODE_INCLUDE if entities else MODE_EXCLUDE
domains = entity_filter.get(CONF_INCLUDE_DOMAINS, [])
include_entities = entity_filter.get(CONF_INCLUDE_ENTITIES)
if include_entities:
domains.extend(_domains_set_from_entities(include_entities))
name_to_type_map = await _async_name_to_type_map(self.hass)
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(
@ -518,6 +601,9 @@ class OptionsFlowHandler(config_entries.OptionsFlow):
vol.Required(CONF_HOMEKIT_MODE, default=homekit_mode): vol.In(
HOMEKIT_MODES
),
vol.Required(
CONF_INCLUDE_EXCLUDE_MODE, default=include_exclude_mode
): vol.In(INCLUDE_EXCLUDE_MODES),
vol.Required(
CONF_DOMAINS,
default=domains,
@ -540,7 +626,9 @@ async def _async_get_supported_devices(hass):
return dict(sorted(unsorted.items(), key=lambda item: item[1]))
def _async_get_matching_entities(hass, domains=None):
def _async_get_matching_entities(
hass: HomeAssistant, domains: list[str] | None = None
) -> dict[str, str]:
"""Fetch all entities or entities in the given domains."""
return {
state.entity_id: f"{state.attributes.get(ATTR_FRIENDLY_NAME, state.entity_id)} ({state.entity_id})"