Schlage: Set the changed by attribute on locks based on log messages (#97469)

This commit is contained in:
David Knowles 2023-08-07 12:07:48 -04:00 committed by GitHub
parent 4089bd43da
commit a4721e9b36
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 9 deletions

View file

@ -1,10 +1,12 @@
"""DataUpdateCoordinator for the Schlage integration."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from pyschlage import Lock, Schlage
from pyschlage.exceptions import Error
from pyschlage.exceptions import Error as SchlageError
from pyschlage.log import LockLog
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@ -12,11 +14,19 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, Upda
from .const import DOMAIN, LOGGER, UPDATE_INTERVAL
@dataclass
class LockData:
"""Container for cached lock data from the Schlage API."""
lock: Lock
logs: list[LockLog]
@dataclass
class SchlageData:
"""Container for cached data from the Schlage API."""
locks: dict[str, Lock]
locks: dict[str, LockData]
class SchlageDataUpdateCoordinator(DataUpdateCoordinator[SchlageData]):
@ -32,10 +42,29 @@ class SchlageDataUpdateCoordinator(DataUpdateCoordinator[SchlageData]):
async def _async_update_data(self) -> SchlageData:
"""Fetch the latest data from the Schlage API."""
try:
return await self.hass.async_add_executor_job(self._update_data)
except Error as ex:
locks = await self.hass.async_add_executor_job(self.api.locks)
except SchlageError as ex:
raise UpdateFailed("Failed to refresh Schlage data") from ex
lock_data = await asyncio.gather(
*(
self.hass.async_add_executor_job(self._get_lock_data, lock)
for lock in locks
)
)
return SchlageData(
locks={ld.lock.device_id: ld for ld in lock_data},
)
def _update_data(self) -> SchlageData:
"""Fetch the latest data from the Schlage API."""
return SchlageData(locks={lock.device_id: lock for lock in self.api.locks()})
def _get_lock_data(self, lock: Lock) -> LockData:
logs: list[LockLog] = []
previous_lock_data = None
if self.data and (previous_lock_data := self.data.locks.get(lock.device_id)):
# Default to the previous data, in case a refresh fails.
# It's not critical if we don't have the freshest data.
logs = previous_lock_data.logs
try:
logs = lock.logs()
except SchlageError as ex:
LOGGER.debug('Failed to read logs for lock "%s": %s', lock.name, ex)
return LockData(lock=lock, logs=logs)

View file

@ -6,7 +6,7 @@ from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, MANUFACTURER
from .coordinator import SchlageDataUpdateCoordinator
from .coordinator import LockData, SchlageDataUpdateCoordinator
class SchlageEntity(CoordinatorEntity[SchlageDataUpdateCoordinator]):
@ -29,10 +29,15 @@ class SchlageEntity(CoordinatorEntity[SchlageDataUpdateCoordinator]):
sw_version=self._lock.firmware_version,
)
@property
def _lock_data(self) -> LockData:
"""Fetch the LockData from our coordinator."""
return self.coordinator.data.locks[self.device_id]
@property
def _lock(self) -> Lock:
"""Fetch the Schlage lock from our coordinator."""
return self.coordinator.data.locks[self.device_id]
return self._lock_data.lock
@property
def available(self) -> bool:

View file

@ -48,6 +48,11 @@ class SchlageLockEntity(SchlageEntity, LockEntity):
"""Update our internal state attributes."""
self._attr_is_locked = self._lock.is_locked
self._attr_is_jammed = self._lock.is_jammed
# Only update changed_by if we get a valid value. This way a previous
# value will stay intact if the latest log message isn't related to a
# lock state change.
if changed_by := self._lock.last_changed_by(self._lock_data.logs):
self._attr_changed_by = changed_by
async def async_lock(self, **kwargs: Any) -> None:
"""Lock the device."""

View file

@ -36,6 +36,7 @@ async def mock_added_config_entry(
) -> MockConfigEntry:
"""Mock ConfigEntry that's been added to HA."""
mock_schlage.locks.return_value = [mock_lock]
mock_schlage.users.return_value = []
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
@ -80,4 +81,6 @@ def mock_lock():
battery_level=20,
firmware_version="1.0",
)
mock_lock.logs.return_value = []
mock_lock.last_changed_by.return_value = "thumbturn"
return mock_lock

View file

@ -1,11 +1,18 @@
"""Test schlage lock."""
from datetime import timedelta
from unittest.mock import Mock
from pyschlage.exceptions import UnknownError
from homeassistant.components.lock import DOMAIN as LOCK_DOMAIN
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_ENTITY_ID, SERVICE_LOCK, SERVICE_UNLOCK
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.util.dt import utcnow
from tests.common import async_fire_time_changed
async def test_lock_device_registry(
@ -43,3 +50,38 @@ async def test_lock_services(
mock_lock.unlock.assert_called_once_with()
await hass.config_entries.async_unload(mock_added_config_entry.entry_id)
async def test_changed_by(
hass: HomeAssistant, mock_lock: Mock, mock_added_config_entry: ConfigEntry
) -> None:
"""Test population of the changed_by attribute."""
mock_lock.last_changed_by.reset_mock()
mock_lock.last_changed_by.return_value = "access code - foo"
# Make the coordinator refresh data.
async_fire_time_changed(hass, utcnow() + timedelta(seconds=31))
await hass.async_block_till_done()
mock_lock.last_changed_by.assert_called_once_with([])
lock_device = hass.states.get("lock.vault_door")
assert lock_device is not None
assert lock_device.attributes.get("changed_by") == "access code - foo"
async def test_changed_by_uses_previous_logs_on_failure(
hass: HomeAssistant, mock_lock: Mock, mock_added_config_entry: ConfigEntry
) -> None:
"""Test that a failure to load logs is not terminal."""
mock_lock.last_changed_by.reset_mock()
mock_lock.last_changed_by.return_value = "thumbturn"
mock_lock.logs.side_effect = UnknownError("Cannot load logs")
# Make the coordinator refresh data.
async_fire_time_changed(hass, utcnow() + timedelta(seconds=31))
await hass.async_block_till_done()
mock_lock.last_changed_by.assert_called_once_with([])
lock_device = hass.states.get("lock.vault_door")
assert lock_device is not None
assert lock_device.attributes.get("changed_by") == "thumbturn"