Fix state for Matter Locks (including optional door sensor) (#121665)

This commit is contained in:
Marcel van der Veldt 2024-07-10 10:32:42 +02:00 committed by GitHub
parent e9b7cc1eba
commit 6702d232e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 62 additions and 27 deletions

View file

@ -145,4 +145,20 @@ DISCOVERY_SCHEMAS = [
required_attributes=(clusters.BooleanState.Attributes.StateValue,),
device_type=(device_types.RainSensor,),
),
MatterDiscoverySchema(
platform=Platform.BINARY_SENSOR,
entity_description=MatterBinarySensorEntityDescription(
key="LockDoorStateSensor",
device_class=BinarySensorDeviceClass.DOOR,
# pylint: disable=unnecessary-lambda
measurement_to_ha=lambda x: {
clusters.DoorLock.Enums.DoorStateEnum.kDoorOpen: True,
clusters.DoorLock.Enums.DoorStateEnum.kDoorJammed: True,
clusters.DoorLock.Enums.DoorStateEnum.kDoorForcedOpen: True,
clusters.DoorLock.Enums.DoorStateEnum.kDoorClosed: False,
}.get(x),
),
entity_class=MatterBinarySensor,
required_attributes=(clusters.DoorLock.Attributes.DoorState,),
),
]

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from typing import Any
from chip.clusters import Objects as clusters
@ -38,6 +39,7 @@ class MatterLock(MatterEntity, LockEntity):
"""Representation of a Matter lock."""
features: int | None = None
_optimistic_timer: asyncio.TimerHandle | None = None
@property
def code_format(self) -> str | None:
@ -90,9 +92,15 @@ class MatterLock(MatterEntity, LockEntity):
async def async_lock(self, **kwargs: Any) -> None:
"""Lock the lock with pin if needed."""
# optimistically signal locking to state machine
self._attr_is_locking = True
self.async_write_ha_state()
if not self._attr_is_locked:
# optimistically signal locking to state machine
self._attr_is_locking = True
self.async_write_ha_state()
# the lock should acknowledge the command with an attribute update
# but bad things may happen, so guard against it with a timer.
self._optimistic_timer = self.hass.loop.call_later(
5, self._reset_optimistic_state
)
code: str | None = kwargs.get(ATTR_CODE)
code_bytes = code.encode() if code else None
await self.send_device_command(
@ -101,9 +109,15 @@ class MatterLock(MatterEntity, LockEntity):
async def async_unlock(self, **kwargs: Any) -> None:
"""Unlock the lock with pin if needed."""
# optimistically signal unlocking to state machine
self._attr_is_unlocking = True
self.async_write_ha_state()
if self._attr_is_locked:
# optimistically signal unlocking to state machine
self._attr_is_unlocking = True
self.async_write_ha_state()
# the lock should acknowledge the command with an attribute update
# but bad things may happen, so guard against it with a timer.
self._optimistic_timer = self.hass.loop.call_later(
5, self._reset_optimistic_state
)
code: str | None = kwargs.get(ATTR_CODE)
code_bytes = code.encode() if code else None
if self.supports_unbolt:
@ -120,9 +134,14 @@ class MatterLock(MatterEntity, LockEntity):
async def async_open(self, **kwargs: Any) -> None:
"""Open the door latch."""
# optimistically signal unlocking to state machine
self._attr_is_unlocking = True
# optimistically signal opening to state machine
self._attr_is_opening = True
self.async_write_ha_state()
# the lock should acknowledge the command with an attribute update
# but bad things may happen, so guard against it with a timer.
self._optimistic_timer = self.hass.loop.call_later(
5, self._reset_optimistic_state
)
code: str | None = kwargs.get(ATTR_CODE)
code_bytes = code.encode() if code else None
await self.send_device_command(
@ -145,38 +164,38 @@ class MatterLock(MatterEntity, LockEntity):
)
# always reset the optimisically (un)locking state on state update
self._attr_is_locking = False
self._attr_is_unlocking = False
self._reset_optimistic_state(write_state=False)
LOGGER.debug("Lock state: %s for %s", lock_state, self.entity_id)
if lock_state is clusters.DoorLock.Enums.DlLockState.kUnlatched:
self._attr_is_locked = False
self._attr_is_open = True
if lock_state is clusters.DoorLock.Enums.DlLockState.kLocked:
self._attr_is_locked = True
self._attr_is_open = False
elif lock_state in (
clusters.DoorLock.Enums.DlLockState.kUnlocked,
clusters.DoorLock.Enums.DlLockState.kUnlatched,
clusters.DoorLock.Enums.DlLockState.kNotFullyLocked,
):
self._attr_is_locked = False
self._attr_is_open = False
else:
# According to the matter docs a null state can happen during device startup.
# Treat any other state as unknown.
# NOTE: A null state can happen during device startup.
self._attr_is_locked = None
self._attr_is_open = None
if self.supports_door_position_sensor:
door_state = self.get_matter_attribute_value(
clusters.DoorLock.Attributes.DoorState
)
assert door_state is not None
LOGGER.debug("Door state: %s for %s", door_state, self.entity_id)
self._attr_is_jammed = (
door_state is clusters.DoorLock.Enums.DoorStateEnum.kDoorJammed
)
self._attr_is_open = (
door_state is clusters.DoorLock.Enums.DoorStateEnum.kDoorOpen
)
@callback
def _reset_optimistic_state(self, write_state: bool = True) -> None:
if self._optimistic_timer and not self._optimistic_timer.cancelled():
self._optimistic_timer.cancel()
self._optimistic_timer = None
self._attr_is_locking = False
self._attr_is_unlocking = False
self._attr_is_opening = False
if write_state:
self.async_write_ha_state()
DISCOVERY_SCHEMAS = [