Small cleanups for HomeKit Controller (#81933)

This commit is contained in:
J. Nick Koston 2022-11-10 17:07:30 -06:00 committed by GitHub
parent f67ecd8ef5
commit 5621dfe419
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2,7 +2,7 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable
from collections.abc import Callable, Iterable
from datetime import timedelta
import logging
from types import MappingProxyType
@ -110,10 +110,6 @@ class HKDevice:
self.signal_state_updated = "_".join((DOMAIN, self.unique_id, "state_updated"))
# Current values of all characteristics homekit_controller is tracking.
# Key is a (accessory_id, characteristic_id) tuple.
self.current_state: dict[tuple[int, int], Any] = {}
self.pollable_characteristics: list[tuple[int, int]] = []
# If this is set polling is active and can be disabled by calling
@ -685,28 +681,26 @@ class HKDevice:
_LOGGER.debug("Finished HomeKit controller update: %s", self.unique_id)
def process_new_events(self, new_values_dict) -> None:
def process_new_events(
self, new_values_dict: dict[tuple[int, int], dict[str, Any]]
) -> None:
"""Process events from accessory into HA state."""
self.async_set_available_state(True)
# Process any stateless events (via device_triggers)
async_fire_triggers(self, new_values_dict)
for (aid, cid), value in new_values_dict.items():
accessory = self.current_state.setdefault(aid, {})
accessory[cid] = value
# self.current_state will be replaced by entity_map in a future PR
# For now we update both
self.entity_map.process_changes(new_values_dict)
async_dispatcher_send(self.hass, self.signal_state_updated)
async def get_characteristics(self, *args, **kwargs) -> dict[str, Any]:
async def get_characteristics(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
"""Read latest state from homekit accessory."""
return await self.pairing.get_characteristics(*args, **kwargs)
async def put_characteristics(self, characteristics) -> None:
async def put_characteristics(
self, characteristics: Iterable[tuple[int, int, Any]]
) -> None:
"""Control a HomeKit device state from Home Assistant."""
results = await self.pairing.put_characteristics(characteristics)