Use registry to find linked batteries for homekit (#33519)

This commit is contained in:
J. Nick Koston 2020-04-21 19:43:49 -05:00 committed by GitHub
parent 46920e9be6
commit 96649a7e27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 555 additions and 127 deletions

View file

@ -7,10 +7,13 @@ import voluptuous as vol
from zeroconf import InterfaceChoice
from homeassistant.components import cover, vacuum
from homeassistant.components.binary_sensor import DEVICE_CLASS_BATTERY_CHARGING
from homeassistant.components.cover import DEVICE_CLASS_GARAGE, DEVICE_CLASS_GATE
from homeassistant.components.http import HomeAssistantView
from homeassistant.components.media_player import DEVICE_CLASS_TV
from homeassistant.const import (
ATTR_BATTERY_CHARGING,
ATTR_BATTERY_LEVEL,
ATTR_DEVICE_CLASS,
ATTR_ENTITY_ID,
ATTR_SERVICE,
@ -20,6 +23,7 @@ from homeassistant.const import (
CONF_NAME,
CONF_PORT,
CONF_TYPE,
DEVICE_CLASS_BATTERY,
DEVICE_CLASS_HUMIDITY,
DEVICE_CLASS_ILLUMINANCE,
DEVICE_CLASS_TEMPERATURE,
@ -31,6 +35,7 @@ from homeassistant.const import (
)
from homeassistant.core import callback
from homeassistant.exceptions import Unauthorized
from homeassistant.helpers import entity_registry
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entityfilter import FILTER_SCHEMA
from homeassistant.util import get_local_ip
@ -47,6 +52,8 @@ from .const import (
CONF_ENTITY_CONFIG,
CONF_FEATURE_LIST,
CONF_FILTER,
CONF_LINKED_BATTERY_CHARGING_SENSOR,
CONF_LINKED_BATTERY_SENSOR,
CONF_SAFE_MODE,
CONF_ZEROCONF_DEFAULT_INTERFACE,
DEFAULT_AUTO_START,
@ -202,10 +209,10 @@ async def async_setup(hass, config):
)
if auto_start:
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, homekit.start)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, homekit.async_start)
return True
def handle_homekit_service_start(service):
async def async_handle_homekit_service_start(service):
"""Handle start HomeKit service call."""
if homekit.status != STATUS_READY:
_LOGGER.warning(
@ -213,10 +220,10 @@ async def async_setup(hass, config):
"been stopped."
)
return
homekit.start()
await homekit.async_start()
hass.services.async_register(
DOMAIN, SERVICE_HOMEKIT_START, handle_homekit_service_start
DOMAIN, SERVICE_HOMEKIT_START, async_handle_homekit_service_start
)
return True
@ -355,7 +362,7 @@ class HomeKit:
# pylint: disable=import-outside-toplevel
from .accessories import HomeBridge, HomeDriver
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.stop)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
ip_addr = self._ip_address or get_local_ip()
path = self.hass.config.path(HOMEKIT_FILE)
@ -393,7 +400,7 @@ class HomeKit:
def add_bridge_accessory(self, state):
"""Try adding accessory to bridge if configured beforehand."""
if not state or not self._filter(state.entity_id):
if not self._filter(state.entity_id):
return
# The bridge itself counts as an accessory
@ -428,12 +435,32 @@ class HomeKit:
acc = self.bridge.accessories.pop(aid)
return acc
def start(self, *args):
async def async_start(self, *args):
"""Start the accessory driver."""
if self.status != STATUS_READY:
return
self.status = STATUS_WAIT
ent_reg = await entity_registry.async_get_registry(self.hass)
device_lookup = ent_reg.async_get_device_class_lookup(
{
("binary_sensor", DEVICE_CLASS_BATTERY_CHARGING),
("sensor", DEVICE_CLASS_BATTERY),
}
)
bridged_states = []
for state in self.hass.states.async_all():
if not self._filter(state.entity_id):
continue
self._async_configure_linked_battery_sensors(ent_reg, device_lookup, state)
bridged_states.append(state)
await self.hass.async_add_executor_job(self._start, bridged_states)
def _start(self, bridged_states):
from . import ( # noqa: F401 pylint: disable=unused-import, import-outside-toplevel
type_covers,
type_fans,
@ -446,7 +473,7 @@ class HomeKit:
type_thermostats,
)
for state in self.hass.states.all():
for state in bridged_states:
self.add_bridge_accessory(state)
self.driver.add_accessory(self.bridge)
@ -457,17 +484,49 @@ class HomeKit:
)
_LOGGER.debug("Driver start")
self.hass.add_job(self.driver.start)
self.hass.async_add_executor_job(self.driver.start)
self.status = STATUS_RUNNING
def stop(self, *args):
async def async_stop(self, *args):
"""Stop the accessory driver."""
if self.status != STATUS_RUNNING:
return
self.status = STATUS_STOPPED
_LOGGER.debug("Driver stop")
self.hass.add_job(self.driver.stop)
self.hass.async_add_executor_job(self.driver.stop)
@callback
def _async_configure_linked_battery_sensors(self, ent_reg, device_lookup, state):
entry = ent_reg.async_get(state.entity_id)
if (
entry is None
or entry.device_id is None
or entry.device_id not in device_lookup
or entry.device_class
in (DEVICE_CLASS_BATTERY_CHARGING, DEVICE_CLASS_BATTERY)
):
return
if ATTR_BATTERY_CHARGING not in state.attributes:
battery_charging_binary_sensor_entity_id = device_lookup[
entry.device_id
].get(("binary_sensor", DEVICE_CLASS_BATTERY_CHARGING))
if battery_charging_binary_sensor_entity_id:
self._config.setdefault(state.entity_id, {}).setdefault(
CONF_LINKED_BATTERY_CHARGING_SENSOR,
battery_charging_binary_sensor_entity_id,
)
if ATTR_BATTERY_LEVEL not in state.attributes:
battery_sensor_entity_id = device_lookup[entry.device_id].get(
("sensor", DEVICE_CLASS_BATTERY)
)
if battery_sensor_entity_id:
self._config.setdefault(state.entity_id, {}).setdefault(
CONF_LINKED_BATTERY_SENSOR, battery_sensor_entity_id
)
class HomeKitPairingQRView(HomeAssistantView):

View file

@ -13,6 +13,7 @@ from homeassistant.const import (
ATTR_BATTERY_LEVEL,
ATTR_ENTITY_ID,
ATTR_SERVICE,
STATE_ON,
__version__,
)
from homeassistant.core import callback as ha_callback, split_entity_id
@ -30,11 +31,15 @@ from .const import (
CHAR_BATTERY_LEVEL,
CHAR_CHARGING_STATE,
CHAR_STATUS_LOW_BATTERY,
CONF_LINKED_BATTERY_CHARGING_SENSOR,
CONF_LINKED_BATTERY_SENSOR,
CONF_LOW_BATTERY_THRESHOLD,
DEBOUNCE_TIMEOUT,
DEFAULT_LOW_BATTERY_THRESHOLD,
EVENT_HOMEKIT_CHANGED,
HK_CHARGING,
HK_NOT_CHARGABLE,
HK_NOT_CHARGING,
MANUFACTURER,
SERV_BATTERY_SERVICE,
)
@ -94,17 +99,17 @@ class HomeAccessory(Accessory):
self.entity_id = entity_id
self.hass = hass
self.debounce = {}
self._support_battery_level = False
self._support_battery_charging = True
self.linked_battery_sensor = self.config.get(CONF_LINKED_BATTERY_SENSOR)
self.linked_battery_charging_sensor = self.config.get(
CONF_LINKED_BATTERY_CHARGING_SENSOR
)
self.low_battery_threshold = self.config.get(
CONF_LOW_BATTERY_THRESHOLD, DEFAULT_LOW_BATTERY_THRESHOLD
)
"""Add battery service if available"""
battery_found = self.hass.states.get(self.entity_id).attributes.get(
ATTR_BATTERY_LEVEL
)
entity_attributes = self.hass.states.get(self.entity_id).attributes
battery_found = entity_attributes.get(ATTR_BATTERY_LEVEL)
if self.linked_battery_sensor:
state = self.hass.states.get(self.linked_battery_sensor)
@ -118,13 +123,28 @@ class HomeAccessory(Accessory):
self.linked_battery_sensor,
)
if battery_found is None:
if not battery_found:
return
_LOGGER.debug("%s: Found battery level", self.entity_id)
self._support_battery_level = True
if self.linked_battery_charging_sensor:
state = self.hass.states.get(self.linked_battery_charging_sensor)
if state is None:
self.linked_battery_charging_sensor = None
_LOGGER.warning(
"%s: Battery charging binary_sensor state missing: %s",
self.entity_id,
self.linked_battery_charging_sensor,
)
else:
_LOGGER.debug("%s: Found battery charging", self.entity_id)
serv_battery = self.add_preload_service(SERV_BATTERY_SERVICE)
self._char_battery = serv_battery.configure_char(CHAR_BATTERY_LEVEL, value=0)
self._char_charging = serv_battery.configure_char(CHAR_CHARGING_STATE, value=2)
self._char_charging = serv_battery.configure_char(
CHAR_CHARGING_STATE, value=HK_NOT_CHARGABLE
)
self._char_low_battery = serv_battery.configure_char(
CHAR_STATUS_LOW_BATTERY, value=0
)
@ -142,17 +162,41 @@ class HomeAccessory(Accessory):
Run inside the Home Assistant event loop.
"""
state = self.hass.states.get(self.entity_id)
self.hass.async_add_job(self.update_state_callback, None, None, state)
self.hass.async_add_executor_job(self.update_state_callback, None, None, state)
async_track_state_change(self.hass, self.entity_id, self.update_state_callback)
battery_charging_state = None
battery_state = None
if self.linked_battery_sensor:
battery_state = self.hass.states.get(self.linked_battery_sensor)
self.hass.async_add_job(
self.update_linked_battery, None, None, battery_state
linked_battery_sensor_state = self.hass.states.get(
self.linked_battery_sensor
)
battery_state = linked_battery_sensor_state.state
battery_charging_state = linked_battery_sensor_state.attributes.get(
ATTR_BATTERY_CHARGING
)
async_track_state_change(
self.hass, self.linked_battery_sensor, self.update_linked_battery
)
else:
battery_state = state.attributes.get(ATTR_BATTERY_LEVEL)
if self.linked_battery_charging_sensor:
battery_charging_state = (
self.hass.states.get(self.linked_battery_charging_sensor).state
== STATE_ON
)
async_track_state_change(
self.hass,
self.linked_battery_charging_sensor,
self.update_linked_battery_charging,
)
elif battery_charging_state is None:
battery_charging_state = state.attributes.get(ATTR_BATTERY_CHARGING)
if battery_state is not None or battery_charging_state is not None:
self.hass.async_add_executor_job(
self.update_battery, battery_state, battery_charging_state
)
@ha_callback
def update_state_callback(self, entity_id=None, old_state=None, new_state=None):
@ -160,37 +204,69 @@ class HomeAccessory(Accessory):
_LOGGER.debug("New_state: %s", new_state)
if new_state is None:
return
if self._support_battery_level and not self.linked_battery_sensor:
self.hass.async_add_executor_job(self.update_battery, new_state)
battery_state = None
battery_charging_state = None
if (
not self.linked_battery_sensor
and ATTR_BATTERY_LEVEL in new_state.attributes
):
battery_state = new_state.attributes.get(ATTR_BATTERY_LEVEL)
if (
not self.linked_battery_charging_sensor
and ATTR_BATTERY_CHARGING in new_state.attributes
):
battery_charging_state = new_state.attributes.get(ATTR_BATTERY_CHARGING)
if battery_state is not None or battery_charging_state is not None:
self.hass.async_add_executor_job(
self.update_battery, battery_state, battery_charging_state
)
self.hass.async_add_executor_job(self.update_state, new_state)
@ha_callback
def update_linked_battery(self, entity_id=None, old_state=None, new_state=None):
"""Handle linked battery sensor state change listener callback."""
self.hass.async_add_executor_job(self.update_battery, new_state)
if self.linked_battery_charging_sensor:
battery_charging_state = None
else:
battery_charging_state = new_state.attributes.get(ATTR_BATTERY_CHARGING)
self.hass.async_add_executor_job(
self.update_battery, new_state.state, battery_charging_state,
)
def update_battery(self, new_state):
@ha_callback
def update_linked_battery_charging(
self, entity_id=None, old_state=None, new_state=None
):
"""Handle linked battery charging sensor state change listener callback."""
self.hass.async_add_executor_job(
self.update_battery, None, new_state.state == STATE_ON
)
def update_battery(self, battery_level, battery_charging):
"""Update battery service if available.
Only call this function if self._support_battery_level is True.
"""
battery_level = convert_to_float(new_state.attributes.get(ATTR_BATTERY_LEVEL))
if self.linked_battery_sensor:
battery_level = convert_to_float(new_state.state)
if battery_level is None:
battery_level = convert_to_float(battery_level)
if battery_level is not None:
if self._char_battery.value != battery_level:
self._char_battery.set_value(battery_level)
is_low_battery = 1 if battery_level < self.low_battery_threshold else 0
if self._char_low_battery.value != is_low_battery:
self._char_low_battery.set_value(is_low_battery)
_LOGGER.debug(
"%s: Updated battery level to %d", self.entity_id, battery_level
)
if battery_charging is None:
return
self._char_battery.set_value(battery_level)
self._char_low_battery.set_value(battery_level < self.low_battery_threshold)
_LOGGER.debug("%s: Updated battery level to %d", self.entity_id, battery_level)
if not self._support_battery_charging:
return
charging = new_state.attributes.get(ATTR_BATTERY_CHARGING)
if charging is None:
self._support_battery_charging = False
return
hk_charging = 1 if charging is True else 0
self._char_charging.set_value(hk_charging)
_LOGGER.debug("%s: Updated battery charging to %d", self.entity_id, hk_charging)
hk_charging = HK_CHARGING if battery_charging else HK_NOT_CHARGING
if self._char_charging.value != hk_charging:
self._char_charging.set_value(hk_charging)
_LOGGER.debug(
"%s: Updated battery charging to %d", self.entity_id, hk_charging
)
def update_state(self, new_state):
"""Handle state change to update HomeKit value.

View file

@ -21,6 +21,7 @@ CONF_FEATURE = "feature"
CONF_FEATURE_LIST = "feature_list"
CONF_FILTER = "filter"
CONF_LINKED_BATTERY_SENSOR = "linked_battery_sensor"
CONF_LINKED_BATTERY_CHARGING_SENSOR = "linked_battery_charging_sensor"
CONF_LOW_BATTERY_THRESHOLD = "low_battery_threshold"
CONF_SAFE_MODE = "safe_mode"
CONF_ZEROCONF_DEFAULT_INTERFACE = "zeroconf_default_interface"
@ -197,3 +198,8 @@ HK_DOOR_STOPPED = 4
HK_POSITION_GOING_TO_MIN = 0
HK_POSITION_GOING_TO_MAX = 1
HK_POSITION_STOPPED = 2
# ### Charging State ###
HK_NOT_CHARGING = 0
HK_CHARGING = 1
HK_NOT_CHARGABLE = 2

View file

@ -100,12 +100,6 @@ class MediaPlayer(HomeAccessory):
"""Initialize a Switch accessory object."""
super().__init__(*args, category=CATEGORY_SWITCH)
state = self.hass.states.get(self.entity_id)
self._flag = {
FEATURE_ON_OFF: False,
FEATURE_PLAY_PAUSE: False,
FEATURE_PLAY_STOP: False,
FEATURE_TOGGLE_MUTE: False,
}
self.chars = {
FEATURE_ON_OFF: None,
FEATURE_PLAY_PAUSE: None,
@ -154,7 +148,6 @@ class MediaPlayer(HomeAccessory):
def set_on_off(self, value):
"""Move switch state to value if call came from HomeKit."""
_LOGGER.debug('%s: Set switch state for "on_off" to %s', self.entity_id, value)
self._flag[FEATURE_ON_OFF] = True
service = SERVICE_TURN_ON if value else SERVICE_TURN_OFF
params = {ATTR_ENTITY_ID: self.entity_id}
self.call_service(DOMAIN, service, params)
@ -164,7 +157,6 @@ class MediaPlayer(HomeAccessory):
_LOGGER.debug(
'%s: Set switch state for "play_pause" to %s', self.entity_id, value
)
self._flag[FEATURE_PLAY_PAUSE] = True
service = SERVICE_MEDIA_PLAY if value else SERVICE_MEDIA_PAUSE
params = {ATTR_ENTITY_ID: self.entity_id}
self.call_service(DOMAIN, service, params)
@ -174,7 +166,6 @@ class MediaPlayer(HomeAccessory):
_LOGGER.debug(
'%s: Set switch state for "play_stop" to %s', self.entity_id, value
)
self._flag[FEATURE_PLAY_STOP] = True
service = SERVICE_MEDIA_PLAY if value else SERVICE_MEDIA_STOP
params = {ATTR_ENTITY_ID: self.entity_id}
self.call_service(DOMAIN, service, params)
@ -184,7 +175,6 @@ class MediaPlayer(HomeAccessory):
_LOGGER.debug(
'%s: Set switch state for "toggle_mute" to %s', self.entity_id, value
)
self._flag[FEATURE_TOGGLE_MUTE] = True
params = {ATTR_ENTITY_ID: self.entity_id, ATTR_MEDIA_VOLUME_MUTED: value}
self.call_service(DOMAIN, SERVICE_VOLUME_MUTE, params)
@ -199,49 +189,39 @@ class MediaPlayer(HomeAccessory):
STATE_STANDBY,
"None",
)
if not self._flag[FEATURE_ON_OFF]:
_LOGGER.debug(
'%s: Set current state for "on_off" to %s', self.entity_id, hk_state
)
if self.chars[FEATURE_ON_OFF].value != hk_state:
self.chars[FEATURE_ON_OFF].set_value(hk_state)
self._flag[FEATURE_ON_OFF] = False
_LOGGER.debug(
'%s: Set current state for "on_off" to %s', self.entity_id, hk_state
)
if self.chars[FEATURE_ON_OFF].value != hk_state:
self.chars[FEATURE_ON_OFF].set_value(hk_state)
if self.chars[FEATURE_PLAY_PAUSE]:
hk_state = current_state == STATE_PLAYING
if not self._flag[FEATURE_PLAY_PAUSE]:
_LOGGER.debug(
'%s: Set current state for "play_pause" to %s',
self.entity_id,
hk_state,
)
if self.chars[FEATURE_PLAY_PAUSE].value != hk_state:
self.chars[FEATURE_PLAY_PAUSE].set_value(hk_state)
self._flag[FEATURE_PLAY_PAUSE] = False
_LOGGER.debug(
'%s: Set current state for "play_pause" to %s',
self.entity_id,
hk_state,
)
if self.chars[FEATURE_PLAY_PAUSE].value != hk_state:
self.chars[FEATURE_PLAY_PAUSE].set_value(hk_state)
if self.chars[FEATURE_PLAY_STOP]:
hk_state = current_state == STATE_PLAYING
if not self._flag[FEATURE_PLAY_STOP]:
_LOGGER.debug(
'%s: Set current state for "play_stop" to %s',
self.entity_id,
hk_state,
)
if self.chars[FEATURE_PLAY_STOP].value != hk_state:
self.chars[FEATURE_PLAY_STOP].set_value(hk_state)
self._flag[FEATURE_PLAY_STOP] = False
_LOGGER.debug(
'%s: Set current state for "play_stop" to %s', self.entity_id, hk_state,
)
if self.chars[FEATURE_PLAY_STOP].value != hk_state:
self.chars[FEATURE_PLAY_STOP].set_value(hk_state)
if self.chars[FEATURE_TOGGLE_MUTE]:
current_state = new_state.attributes.get(ATTR_MEDIA_VOLUME_MUTED)
if not self._flag[FEATURE_TOGGLE_MUTE]:
_LOGGER.debug(
'%s: Set current state for "toggle_mute" to %s',
self.entity_id,
current_state,
)
if self.chars[FEATURE_TOGGLE_MUTE].value != current_state:
self.chars[FEATURE_TOGGLE_MUTE].set_value(current_state)
self._flag[FEATURE_TOGGLE_MUTE] = False
_LOGGER.debug(
'%s: Set current state for "toggle_mute" to %s',
self.entity_id,
current_state,
)
if self.chars[FEATURE_TOGGLE_MUTE].value != current_state:
self.chars[FEATURE_TOGGLE_MUTE].set_value(current_state)
@TYPES.register("TelevisionMediaPlayer")
@ -253,11 +233,6 @@ class TelevisionMediaPlayer(HomeAccessory):
super().__init__(*args, category=CATEGORY_TELEVISION)
state = self.hass.states.get(self.entity_id)
self._flag = {
CHAR_ACTIVE: False,
CHAR_ACTIVE_IDENTIFIER: False,
CHAR_MUTE: False,
}
self.support_select_source = False
self.sources = []
@ -348,7 +323,6 @@ class TelevisionMediaPlayer(HomeAccessory):
def set_on_off(self, value):
"""Move switch state to value if call came from HomeKit."""
_LOGGER.debug('%s: Set switch state for "on_off" to %s', self.entity_id, value)
self._flag[CHAR_ACTIVE] = True
service = SERVICE_TURN_ON if value else SERVICE_TURN_OFF
params = {ATTR_ENTITY_ID: self.entity_id}
self.call_service(DOMAIN, service, params)
@ -358,7 +332,6 @@ class TelevisionMediaPlayer(HomeAccessory):
_LOGGER.debug(
'%s: Set switch state for "toggle_mute" to %s', self.entity_id, value
)
self._flag[CHAR_MUTE] = True
params = {ATTR_ENTITY_ID: self.entity_id, ATTR_MEDIA_VOLUME_MUTED: value}
self.call_service(DOMAIN, SERVICE_VOLUME_MUTE, params)
@ -379,7 +352,6 @@ class TelevisionMediaPlayer(HomeAccessory):
"""Send input set value if call came from HomeKit."""
_LOGGER.debug("%s: Set current input to %s", self.entity_id, value)
source = self.sources[value]
self._flag[CHAR_ACTIVE_IDENTIFIER] = True
params = {ATTR_ENTITY_ID: self.entity_id, ATTR_INPUT_SOURCE: source}
self.call_service(DOMAIN, SERVICE_SELECT_SOURCE, params)
@ -409,31 +381,23 @@ class TelevisionMediaPlayer(HomeAccessory):
if current_state not in ("None", STATE_OFF, STATE_UNKNOWN):
hk_state = 1
if not self._flag[CHAR_ACTIVE]:
_LOGGER.debug(
"%s: Set current active state to %s", self.entity_id, hk_state
)
if self.char_active.value != hk_state:
self.char_active.set_value(hk_state)
self._flag[CHAR_ACTIVE] = False
_LOGGER.debug("%s: Set current active state to %s", self.entity_id, hk_state)
if self.char_active.value != hk_state:
self.char_active.set_value(hk_state)
# Set mute state
if CHAR_VOLUME_SELECTOR in self.chars_speaker:
current_mute_state = new_state.attributes.get(ATTR_MEDIA_VOLUME_MUTED)
if not self._flag[CHAR_MUTE]:
_LOGGER.debug(
"%s: Set current mute state to %s",
self.entity_id,
current_mute_state,
)
if self.char_mute.value != current_mute_state:
self.char_mute.set_value(current_mute_state)
self._flag[CHAR_MUTE] = False
_LOGGER.debug(
"%s: Set current mute state to %s", self.entity_id, current_mute_state,
)
if self.char_mute.value != current_mute_state:
self.char_mute.set_value(current_mute_state)
# Set active input
if self.support_select_source:
source_name = new_state.attributes.get(ATTR_INPUT_SOURCE)
if self.sources and not self._flag[CHAR_ACTIVE_IDENTIFIER]:
if self.sources:
_LOGGER.debug(
"%s: Set current input to %s", self.entity_id, source_name
)
@ -448,4 +412,3 @@ class TelevisionMediaPlayer(HomeAccessory):
)
if self.char_input_source.value != 0:
self.char_input_source.set_value(0)
self._flag[CHAR_ACTIVE_IDENTIFIER] = False

View file

@ -10,7 +10,17 @@ timer.
import asyncio
from collections import OrderedDict
import logging
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, cast
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
cast,
)
import attr
@ -109,6 +119,22 @@ class EntityRegistry:
EVENT_DEVICE_REGISTRY_UPDATED, self.async_device_removed
)
@callback
def async_get_device_class_lookup(self, domain_device_classes: set) -> dict:
"""Return a lookup for the device class by domain."""
lookup: Dict[str, Dict[Tuple[Any, Any], str]] = {}
for entity in self.entities.values():
if not entity.device_id:
continue
domain_device_class = (entity.domain, entity.device_class)
if domain_device_class not in domain_device_classes:
continue
if entity.device_id not in lookup:
lookup[entity.device_id] = {domain_device_class: entity.entity_id}
else:
lookup[entity.device_id][domain_device_class] = entity.entity_id
return lookup
@callback
def async_is_registered(self, entity_id: str) -> bool:
"""Check if an entity_id is currently registered."""

View file

@ -24,6 +24,7 @@ from homeassistant.components.homekit.const import (
CHAR_MODEL,
CHAR_NAME,
CHAR_SERIAL_NUMBER,
CONF_LINKED_BATTERY_CHARGING_SENSOR,
CONF_LINKED_BATTERY_SENSOR,
CONF_LOW_BATTERY_THRESHOLD,
MANUFACTURER,
@ -36,6 +37,8 @@ from homeassistant.const import (
ATTR_NOW,
ATTR_SERVICE,
EVENT_TIME_CHANGED,
STATE_OFF,
STATE_ON,
__version__,
)
import homeassistant.util.dt as dt_util
@ -245,6 +248,99 @@ async def test_linked_battery_sensor(hass, hk_driver, caplog):
assert acc._char_charging.value == 0
async def test_linked_battery_charging_sensor(hass, hk_driver, caplog):
"""Test battery service with linked_battery_charging_sensor."""
entity_id = "homekit.accessory"
linked_battery_charging_sensor = "binary_sensor.battery_charging"
hass.states.async_set(entity_id, "open", {ATTR_BATTERY_LEVEL: 100})
hass.states.async_set(linked_battery_charging_sensor, STATE_ON, None)
await hass.async_block_till_done()
acc = HomeAccessory(
hass,
hk_driver,
"Battery Service",
entity_id,
2,
{CONF_LINKED_BATTERY_CHARGING_SENSOR: linked_battery_charging_sensor},
)
acc.update_state = lambda x: None
assert acc.linked_battery_charging_sensor == linked_battery_charging_sensor
await acc.run_handler()
await hass.async_block_till_done()
assert acc._char_battery.value == 100
assert acc._char_low_battery.value == 0
assert acc._char_charging.value == 1
hass.states.async_set(linked_battery_charging_sensor, STATE_OFF, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc._char_charging.value == 0
hass.states.async_set(linked_battery_charging_sensor, STATE_ON, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc._char_charging.value == 1
async def test_linked_battery_sensor_and_linked_battery_charging_sensor(
hass, hk_driver, caplog
):
"""Test battery service with linked_battery_sensor and a linked_battery_charging_sensor."""
entity_id = "homekit.accessory"
linked_battery = "sensor.battery"
linked_battery_charging_sensor = "binary_sensor.battery_charging"
hass.states.async_set(entity_id, "open", {ATTR_BATTERY_LEVEL: 100})
hass.states.async_set(linked_battery, 50, None)
hass.states.async_set(linked_battery_charging_sensor, STATE_ON, None)
await hass.async_block_till_done()
acc = HomeAccessory(
hass,
hk_driver,
"Battery Service",
entity_id,
2,
{
CONF_LINKED_BATTERY_SENSOR: linked_battery,
CONF_LINKED_BATTERY_CHARGING_SENSOR: linked_battery_charging_sensor,
},
)
acc.update_state = lambda x: None
assert acc.linked_battery_sensor == linked_battery
await acc.run_handler()
await hass.async_block_till_done()
assert acc._char_battery.value == 50
assert acc._char_low_battery.value == 0
assert acc._char_charging.value == 1
hass.states.async_set(linked_battery_charging_sensor, STATE_OFF, None)
await hass.async_block_till_done()
assert acc._char_battery.value == 50
assert acc._char_low_battery.value == 0
assert acc._char_charging.value == 0
async def test_missing_linked_battery_charging_sensor(hass, hk_driver, caplog):
"""Test battery service with linked_battery_charging_sensor that is mapping to a missing entity."""
entity_id = "homekit.accessory"
linked_battery_charging_sensor = "binary_sensor.battery_charging"
hass.states.async_set(entity_id, "open", {ATTR_BATTERY_LEVEL: 100})
await hass.async_block_till_done()
acc = HomeAccessory(
hass,
hk_driver,
"Battery Service",
entity_id,
2,
{CONF_LINKED_BATTERY_CHARGING_SENSOR: linked_battery_charging_sensor},
)
assert acc.linked_battery_charging_sensor is None
async def test_missing_linked_battery_sensor(hass, hk_driver, caplog):
"""Test battery service with missing linked_battery_sensor."""
entity_id = "homekit.accessory"

View file

@ -1,10 +1,12 @@
"""Tests for the HomeKit component."""
from unittest.mock import ANY, Mock, patch
from asynctest import CoroutineMock
import pytest
from zeroconf import InterfaceChoice
from homeassistant import setup
from homeassistant.components.binary_sensor import DEVICE_CLASS_BATTERY_CHARGING
from homeassistant.components.homekit import (
MAX_DEVICES,
STATUS_READY,
@ -27,22 +29,39 @@ from homeassistant.components.homekit.const import (
SERVICE_HOMEKIT_START,
)
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_ENTITY_ID,
CONF_IP_ADDRESS,
CONF_NAME,
CONF_PORT,
DEVICE_CLASS_BATTERY,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
STATE_ON,
)
from homeassistant.core import State
from homeassistant.helpers import device_registry
from homeassistant.helpers.entityfilter import generate_filter
from tests.common import MockConfigEntry, mock_device_registry, mock_registry
from tests.components.homekit.common import patch_debounce
IP_ADDRESS = "127.0.0.1"
PATH_HOMEKIT = "homeassistant.components.homekit"
@pytest.fixture
def device_reg(hass):
"""Return an empty, loaded, registry."""
return mock_device_registry(hass)
@pytest.fixture
def entity_reg(hass):
"""Return an empty, loaded, registry."""
return mock_registry(hass)
@pytest.fixture(scope="module")
def debounce_patcher():
"""Patch debounce method."""
@ -66,7 +85,7 @@ async def test_setup_min(hass):
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
mock_homekit().start.assert_called_with(ANY)
mock_homekit().async_start.assert_called_with(ANY)
async def test_setup_auto_start_disabled(hass):
@ -83,6 +102,7 @@ async def test_setup_auto_start_disabled(hass):
with patch(f"{PATH_HOMEKIT}.HomeKit") as mock_homekit:
mock_homekit.return_value = homekit = Mock()
type(homekit).async_start = CoroutineMock()
assert await setup.async_setup_component(hass, DOMAIN, config)
mock_homekit.assert_any_call(
@ -92,23 +112,26 @@ async def test_setup_auto_start_disabled(hass):
# Test auto_start disabled
homekit.reset_mock()
homekit.async_start.reset_mock()
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
assert homekit.start.called is False
assert homekit.async_start.called is False
# Test start call with driver is ready
homekit.reset_mock()
homekit.async_start.reset_mock()
homekit.status = STATUS_READY
await hass.services.async_call(DOMAIN, SERVICE_HOMEKIT_START, blocking=True)
assert homekit.start.called is True
assert homekit.async_start.called is True
# Test start call with driver started
homekit.reset_mock()
homekit.async_start.reset_mock()
homekit.status = STATUS_STOPPED
await hass.services.async_call(DOMAIN, SERVICE_HOMEKIT_START, blocking=True)
assert homekit.start.called is False
assert homekit.async_start.called is False
async def test_homekit_setup(hass, hk_driver):
@ -279,6 +302,7 @@ async def test_homekit_start(hass, hk_driver, debounce_patcher):
homekit.bridge = Mock()
homekit.bridge.accessories = []
homekit.driver = hk_driver
homekit._filter = Mock(return_value=True)
hass.states.async_set("light.demo", "on")
state = hass.states.async_all()[0]
@ -290,7 +314,7 @@ async def test_homekit_start(hass, hk_driver, debounce_patcher):
) as hk_driver_add_acc, patch(
"pyhap.accessory_driver.AccessoryDriver.start"
) as hk_driver_start:
await hass.async_add_executor_job(homekit.start)
await homekit.async_start()
mock_add_acc.assert_called_with(state)
mock_setup_msg.assert_called_with(hass, pin, ANY)
@ -300,7 +324,7 @@ async def test_homekit_start(hass, hk_driver, debounce_patcher):
# Test start() if already started
hk_driver_start.reset_mock()
await hass.async_add_executor_job(homekit.start)
await homekit.async_start()
assert not hk_driver_start.called
@ -326,7 +350,7 @@ async def test_homekit_start_with_a_broken_accessory(hass, hk_driver, debounce_p
) as hk_driver_add_acc, patch(
"pyhap.accessory_driver.AccessoryDriver.start"
) as hk_driver_start:
await hass.async_add_executor_job(homekit.start)
await homekit.async_start()
mock_setup_msg.assert_called_with(hass, pin, ANY)
hk_driver_add_acc.assert_called_with(homekit.bridge)
@ -335,7 +359,7 @@ async def test_homekit_start_with_a_broken_accessory(hass, hk_driver, debounce_p
# Test start() if already started
hk_driver_start.reset_mock()
await hass.async_add_executor_job(homekit.start)
await homekit.async_start()
assert not hk_driver_start.called
@ -344,17 +368,23 @@ async def test_homekit_stop(hass):
homekit = HomeKit(hass, None, None, None, None, None, None)
homekit.driver = Mock()
assert await setup.async_setup_component(hass, DOMAIN, {DOMAIN: {}})
assert homekit.status == STATUS_READY
await hass.async_add_executor_job(homekit.stop)
await homekit.async_stop()
await hass.async_block_till_done()
homekit.status = STATUS_WAIT
await hass.async_add_executor_job(homekit.stop)
await homekit.async_stop()
await hass.async_block_till_done()
homekit.status = STATUS_STOPPED
await hass.async_add_executor_job(homekit.stop)
await homekit.async_stop()
await hass.async_block_till_done()
assert homekit.driver.stop.called is False
# Test if driver is started
homekit.status = STATUS_RUNNING
await hass.async_add_executor_job(homekit.stop)
await homekit.async_stop()
await hass.async_block_till_done()
assert homekit.driver.stop.called is True
@ -408,5 +438,74 @@ async def test_homekit_too_many_accessories(hass, hk_driver):
), patch("homeassistant.components.homekit._LOGGER.warning") as mock_warn, patch(
f"{PATH_HOMEKIT}.show_setup_message"
):
await hass.async_add_executor_job(homekit.start)
await homekit.async_start()
await hass.async_block_till_done()
assert mock_warn.called is True
async def test_homekit_finds_linked_batteries(
hass, hk_driver, debounce_patcher, device_reg, entity_reg
):
"""Test HomeKit start method."""
assert await setup.async_setup_component(hass, DOMAIN, {DOMAIN: {}})
homekit = HomeKit(hass, None, None, None, {}, {"light.demo": {}}, None, None)
homekit.driver = hk_driver
homekit._filter = Mock(return_value=True)
homekit.bridge = HomeBridge(hass, hk_driver, "mock_bridge")
config_entry = MockConfigEntry(domain="test", data={})
config_entry.add_to_hass(hass)
device_entry = device_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(device_registry.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
)
binary_charging_sensor = entity_reg.async_get_or_create(
"binary_sensor",
"light",
"battery_charging",
device_id=device_entry.id,
device_class=DEVICE_CLASS_BATTERY_CHARGING,
)
battery_sensor = entity_reg.async_get_or_create(
"sensor",
"light",
"battery",
device_id=device_entry.id,
device_class=DEVICE_CLASS_BATTERY,
)
light = entity_reg.async_get_or_create(
"light", "light", "demo", device_id=device_entry.id
)
hass.states.async_set(
binary_charging_sensor.entity_id,
STATE_ON,
{ATTR_DEVICE_CLASS: DEVICE_CLASS_BATTERY_CHARGING},
)
hass.states.async_set(
battery_sensor.entity_id, 30, {ATTR_DEVICE_CLASS: DEVICE_CLASS_BATTERY}
)
hass.states.async_set(light.entity_id, STATE_ON)
def _mock_get_accessory(*args, **kwargs):
return [None, "acc", None]
with patch.object(homekit.bridge, "add_accessory"), patch(
f"{PATH_HOMEKIT}.show_setup_message"
), patch(f"{PATH_HOMEKIT}.get_accessory") as mock_get_acc, patch(
"pyhap.accessory_driver.AccessoryDriver.start"
):
await homekit.async_start()
mock_get_acc.assert_called_with(
hass,
hk_driver,
ANY,
ANY,
{
"linked_battery_charging_sensor": "binary_sensor.light_battery_charging",
"linked_battery_sensor": "sensor.light_battery",
},
)

View file

@ -65,6 +65,7 @@ async def test_garage_door_open_close(hass, hk_driver, cls, events):
await hass.async_block_till_done()
acc = cls.garage(hass, hk_driver, "Garage Door", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 4 # GarageDoorOpener
@ -143,6 +144,7 @@ async def test_window_set_cover_position(hass, hk_driver, cls, events):
await hass.async_block_till_done()
acc = cls.window(hass, hk_driver, "Cover", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 14 # WindowCovering
@ -214,6 +216,7 @@ async def test_window_cover_set_tilt(hass, hk_driver, cls, events):
await hass.async_block_till_done()
acc = cls.window(hass, hk_driver, "Cover", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 14 # CATEGORY_WINDOW_COVERING
@ -277,6 +280,7 @@ async def test_window_open_close(hass, hk_driver, cls, events):
hass.states.async_set(entity_id, STATE_UNKNOWN, {ATTR_SUPPORTED_FEATURES: 0})
acc = cls.window_basic(hass, hk_driver, "Cover", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 14 # WindowCovering
@ -359,6 +363,7 @@ async def test_window_open_close_stop(hass, hk_driver, cls, events):
)
acc = cls.window_basic(hass, hk_driver, "Cover", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
# Set from HomeKit
call_close_cover = async_mock_service(hass, DOMAIN, "close_cover")
@ -407,10 +412,15 @@ async def test_window_open_close_with_position_and_stop(hass, hk_driver, cls, ev
)
acc = cls.window(hass, hk_driver, "Cover", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
# Set from HomeKit
call_stop_cover = async_mock_service(hass, DOMAIN, "stop_cover")
await hass.async_add_executor_job(acc.char_hold_position.client_update_value, 0)
await hass.async_block_till_done()
assert not call_stop_cover
await hass.async_add_executor_job(acc.char_hold_position.client_update_value, 1)
await hass.async_block_till_done()
assert call_stop_cover

View file

@ -286,6 +286,8 @@ async def test_fan_speed(hass, hk_driver, cls, events):
assert acc.char_speed.value != 0
await acc.run_handler()
await hass.async_block_till_done()
assert (
acc.speed_mapping.speed_ranges == HomeKitSpeedMapping(speed_list).speed_ranges
)
@ -351,6 +353,8 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
# speed to 100 when turning on a fan on a freshly booted up server.
assert acc.char_speed.value != 0
await acc.run_handler()
await hass.async_block_till_done()
assert (
acc.speed_mapping.speed_ranges == HomeKitSpeedMapping(speed_list).speed_ranges
)

View file

@ -58,6 +58,7 @@ async def test_media_player_set_state(hass, hk_driver, events):
await hass.async_block_till_done()
acc = MediaPlayer(hass, hk_driver, "MediaPlayer", entity_id, 2, config)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 8 # Switch
@ -199,6 +200,7 @@ async def test_media_player_television(hass, hk_driver, events, caplog):
await hass.async_block_till_done()
acc = TelevisionMediaPlayer(hass, hk_driver, "MediaPlayer", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 31 # Television

View file

@ -41,6 +41,7 @@ async def test_temperature(hass, hk_driver):
await hass.async_block_till_done()
acc = TemperatureSensor(hass, hk_driver, "Temperature", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 10 # Sensor
@ -74,6 +75,7 @@ async def test_humidity(hass, hk_driver):
await hass.async_block_till_done()
acc = HumiditySensor(hass, hk_driver, "Humidity", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 10 # Sensor
@ -97,6 +99,7 @@ async def test_air_quality(hass, hk_driver):
await hass.async_block_till_done()
acc = AirQualitySensor(hass, hk_driver, "Air Quality", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 10 # Sensor
@ -128,6 +131,7 @@ async def test_co(hass, hk_driver):
await hass.async_block_till_done()
acc = CarbonMonoxideSensor(hass, hk_driver, "CO", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 10 # Sensor
@ -167,6 +171,7 @@ async def test_co2(hass, hk_driver):
await hass.async_block_till_done()
acc = CarbonDioxideSensor(hass, hk_driver, "CO2", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 10 # Sensor
@ -206,6 +211,7 @@ async def test_light(hass, hk_driver):
await hass.async_block_till_done()
acc = LightSensor(hass, hk_driver, "Light", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 10 # Sensor
@ -230,6 +236,7 @@ async def test_binary(hass, hk_driver):
acc = BinarySensor(hass, hk_driver, "Window Opening", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 10 # Sensor
@ -268,6 +275,7 @@ async def test_motion_uses_bool(hass, hk_driver):
acc = BinarySensor(hass, hk_driver, "Motion Sensor", entity_id, 2, None)
await acc.run_handler()
await hass.async_block_till_done()
assert acc.aid == 2
assert acc.category == 10 # Sensor

View file

@ -561,3 +561,82 @@ async def test_restore_states(hass):
assert hass.states.get("light.simple") is None
assert hass.states.get("light.disabled") is None
assert hass.states.get("light.all_info_set") is None
async def test_async_get_device_class_lookup(hass):
"""Test registry device class lookup."""
hass.state = CoreState.not_running
ent_reg = await entity_registry.async_get_registry(hass)
ent_reg.async_get_or_create(
"binary_sensor",
"light",
"battery_charging",
device_id="light_device_entry_id",
device_class="battery_charging",
)
ent_reg.async_get_or_create(
"sensor",
"light",
"battery",
device_id="light_device_entry_id",
device_class="battery",
)
ent_reg.async_get_or_create(
"light", "light", "demo", device_id="light_device_entry_id"
)
ent_reg.async_get_or_create(
"binary_sensor",
"vacuum",
"battery_charging",
device_id="vacuum_device_entry_id",
device_class="battery_charging",
)
ent_reg.async_get_or_create(
"sensor",
"vacuum",
"battery",
device_id="vacuum_device_entry_id",
device_class="battery",
)
ent_reg.async_get_or_create(
"vacuum", "vacuum", "demo", device_id="vacuum_device_entry_id"
)
ent_reg.async_get_or_create(
"binary_sensor",
"remote",
"battery_charging",
device_id="remote_device_entry_id",
device_class="battery_charging",
)
ent_reg.async_get_or_create(
"remote", "remote", "demo", device_id="remote_device_entry_id"
)
device_lookup = ent_reg.async_get_device_class_lookup(
{("binary_sensor", "battery_charging"), ("sensor", "battery")}
)
assert device_lookup == {
"remote_device_entry_id": {
(
"binary_sensor",
"battery_charging",
): "binary_sensor.remote_battery_charging"
},
"light_device_entry_id": {
(
"binary_sensor",
"battery_charging",
): "binary_sensor.light_battery_charging",
("sensor", "battery"): "sensor.light_battery",
},
"vacuum_device_entry_id": {
(
"binary_sensor",
"battery_charging",
): "binary_sensor.vacuum_battery_charging",
("sensor", "battery"): "sensor.vacuum_battery",
},
}