hass-core/homeassistant/components/matter/lock.py
2024-07-10 11:20:26 +02:00

211 lines
7.6 KiB
Python

"""Matter lock."""
from __future__ import annotations
import asyncio
from typing import Any
from chip.clusters import Objects as clusters
from homeassistant.components.lock import (
LockEntity,
LockEntityDescription,
LockEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_CODE, Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import LOGGER
from .entity import MatterEntity
from .helpers import get_matter
from .models import MatterDiscoverySchema
DoorLockFeature = clusters.DoorLock.Bitmaps.Feature
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Matter lock from Config Entry."""
matter = get_matter(hass)
matter.register_platform_handler(Platform.LOCK, async_add_entities)
class MatterLock(MatterEntity, LockEntity):
"""Representation of a Matter lock."""
features: int | None = None
_optimistic_timer: asyncio.TimerHandle | None = None
@property
def code_format(self) -> str | None:
"""Regex for code format or None if no code is required."""
if self.get_matter_attribute_value(
clusters.DoorLock.Attributes.RequirePINforRemoteOperation
):
min_pincode_length = int(
self.get_matter_attribute_value(
clusters.DoorLock.Attributes.MinPINCodeLength
)
)
max_pincode_length = int(
self.get_matter_attribute_value(
clusters.DoorLock.Attributes.MaxPINCodeLength
)
)
return f"^\\d{{{min_pincode_length},{max_pincode_length}}}$"
return None
@property
def supports_door_position_sensor(self) -> bool:
"""Return True if the lock supports door position sensor."""
if self.features is None:
return False
return bool(self.features & DoorLockFeature.kDoorPositionSensor)
@property
def supports_unbolt(self) -> bool:
"""Return True if the lock supports unbolt."""
if self.features is None:
return False
return bool(self.features & DoorLockFeature.kUnbolt)
async def send_device_command(
self,
command: clusters.ClusterCommand,
timed_request_timeout_ms: int = 1000,
) -> None:
"""Send a command to the device."""
await self.matter_client.send_device_command(
node_id=self._endpoint.node.node_id,
endpoint_id=self._endpoint.endpoint_id,
command=command,
timed_request_timeout_ms=timed_request_timeout_ms,
)
async def async_lock(self, **kwargs: Any) -> None:
"""Lock the lock with pin if needed."""
if not self._attr_is_locked:
# optimistically signal locking to state machine
self._attr_is_locking = True
self.async_write_ha_state()
# the lock should acknowledge the command with an attribute update
# but bad things may happen, so guard against it with a timer.
self._optimistic_timer = self.hass.loop.call_later(
30, self._reset_optimistic_state
)
code: str | None = kwargs.get(ATTR_CODE)
code_bytes = code.encode() if code else None
await self.send_device_command(
command=clusters.DoorLock.Commands.LockDoor(code_bytes)
)
async def async_unlock(self, **kwargs: Any) -> None:
"""Unlock the lock with pin if needed."""
if self._attr_is_locked:
# optimistically signal unlocking to state machine
self._attr_is_unlocking = True
self.async_write_ha_state()
# the lock should acknowledge the command with an attribute update
# but bad things may happen, so guard against it with a timer.
self._optimistic_timer = self.hass.loop.call_later(
30, self._reset_optimistic_state
)
code: str | None = kwargs.get(ATTR_CODE)
code_bytes = code.encode() if code else None
if self.supports_unbolt:
# if the lock reports it has separate unbolt support,
# the unlock command should unbolt only on the unlock command
# and unlatch on the HA 'open' command.
await self.send_device_command(
command=clusters.DoorLock.Commands.UnboltDoor(code_bytes)
)
else:
await self.send_device_command(
command=clusters.DoorLock.Commands.UnlockDoor(code_bytes)
)
async def async_open(self, **kwargs: Any) -> None:
"""Open the door latch."""
# optimistically signal opening to state machine
self._attr_is_opening = True
self.async_write_ha_state()
# the lock should acknowledge the command with an attribute update
# but bad things may happen, so guard against it with a timer.
self._optimistic_timer = self.hass.loop.call_later(
30 if self._attr_is_locked else 5, self._reset_optimistic_state
)
code: str | None = kwargs.get(ATTR_CODE)
code_bytes = code.encode() if code else None
await self.send_device_command(
command=clusters.DoorLock.Commands.UnlockDoor(code_bytes)
)
@callback
def _update_from_device(self) -> None:
"""Update the entity from the device."""
if self.features is None:
self.features = int(
self.get_matter_attribute_value(clusters.DoorLock.Attributes.FeatureMap)
)
if self.supports_unbolt:
self._attr_supported_features = LockEntityFeature.OPEN
lock_state = self.get_matter_attribute_value(
clusters.DoorLock.Attributes.LockState
)
# always reset the optimisically (un)locking state on state update
self._reset_optimistic_state(write_state=False)
LOGGER.debug("Lock state: %s for %s", lock_state, self.entity_id)
if lock_state is clusters.DoorLock.Enums.DlLockState.kUnlatched:
self._attr_is_locked = False
self._attr_is_open = True
if lock_state is clusters.DoorLock.Enums.DlLockState.kLocked:
self._attr_is_locked = True
self._attr_is_open = False
elif lock_state in (
clusters.DoorLock.Enums.DlLockState.kUnlocked,
clusters.DoorLock.Enums.DlLockState.kNotFullyLocked,
):
self._attr_is_locked = False
self._attr_is_open = False
else:
# Treat any other state as unknown.
# NOTE: A null state can happen during device startup.
self._attr_is_locked = None
self._attr_is_open = None
@callback
def _reset_optimistic_state(self, write_state: bool = True) -> None:
if self._optimistic_timer and not self._optimistic_timer.cancelled():
self._optimistic_timer.cancel()
self._optimistic_timer = None
self._attr_is_locking = False
self._attr_is_unlocking = False
self._attr_is_opening = False
if write_state:
self.async_write_ha_state()
DISCOVERY_SCHEMAS = [
MatterDiscoverySchema(
platform=Platform.LOCK,
entity_description=LockEntityDescription(
key="MatterLock", translation_key="lock"
),
entity_class=MatterLock,
required_attributes=(clusters.DoorLock.Attributes.LockState,),
optional_attributes=(clusters.DoorLock.Attributes.DoorState,),
),
]