Move risco coordinator to separate module (#117549)

This commit is contained in:
epenet 2024-05-16 13:29:57 +02:00 committed by GitHub
parent 388132cfc8
commit 59645aeb0f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 97 additions and 86 deletions

View file

@ -4,19 +4,10 @@ from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import timedelta
import logging
from typing import Any
from pyrisco import (
CannotConnectError,
OperationError,
RiscoCloud,
RiscoLocal,
UnauthorizedError,
)
from pyrisco.cloud.alarm import Alarm
from pyrisco.cloud.event import Event
from pyrisco import CannotConnectError, RiscoCloud, RiscoLocal, UnauthorizedError
from pyrisco.common import Partition, System, Zone
from homeassistant.config_entries import ConfigEntry
@ -34,8 +25,6 @@ from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
CONF_CONCURRENCY,
@ -47,6 +36,7 @@ from .const import (
SYSTEM_UPDATE_SIGNAL,
TYPE_LOCAL,
)
from .coordinator import RiscoDataUpdateCoordinator, RiscoEventsDataUpdateCoordinator
PLATFORMS = [
Platform.ALARM_CONTROL_PANEL,
@ -54,8 +44,6 @@ PLATFORMS = [
Platform.SENSOR,
Platform.SWITCH,
]
LAST_EVENT_STORAGE_VERSION = 1
LAST_EVENT_TIMESTAMP_KEY = "last_event_timestamp"
_LOGGER = logging.getLogger(__name__)
@ -190,63 +178,3 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def _update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)
class RiscoDataUpdateCoordinator(DataUpdateCoordinator[Alarm]): # pylint: disable=hass-enforce-coordinator-module
"""Class to manage fetching risco data."""
def __init__(
self, hass: HomeAssistant, risco: RiscoCloud, scan_interval: int
) -> None:
"""Initialize global risco data updater."""
self.risco = risco
interval = timedelta(seconds=scan_interval)
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=interval,
)
async def _async_update_data(self) -> Alarm:
"""Fetch data from risco."""
try:
return await self.risco.get_state()
except (CannotConnectError, UnauthorizedError, OperationError) as error:
raise UpdateFailed(error) from error
class RiscoEventsDataUpdateCoordinator(DataUpdateCoordinator[list[Event]]): # pylint: disable=hass-enforce-coordinator-module
"""Class to manage fetching risco data."""
def __init__(
self, hass: HomeAssistant, risco: RiscoCloud, eid: str, scan_interval: int
) -> None:
"""Initialize global risco data updater."""
self.risco = risco
self._store = Store[dict[str, Any]](
hass, LAST_EVENT_STORAGE_VERSION, f"risco_{eid}_last_event_timestamp"
)
interval = timedelta(seconds=scan_interval)
super().__init__(
hass,
_LOGGER,
name=f"{DOMAIN}_events",
update_interval=interval,
)
async def _async_update_data(self) -> list[Event]:
"""Fetch data from risco."""
last_store = await self._store.async_load() or {}
last_timestamp = last_store.get(
LAST_EVENT_TIMESTAMP_KEY, "2020-01-01T00:00:00Z"
)
try:
events = await self.risco.get_events(last_timestamp, 10)
except (CannotConnectError, UnauthorizedError, OperationError) as error:
raise UpdateFailed(error) from error
if len(events) > 0:
await self._store.async_save({LAST_EVENT_TIMESTAMP_KEY: events[0].time})
return events

View file

@ -29,7 +29,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import LocalData, RiscoDataUpdateCoordinator, is_local
from . import LocalData, is_local
from .const import (
CONF_CODE_ARM_REQUIRED,
CONF_CODE_DISARM_REQUIRED,
@ -42,6 +42,7 @@ from .const import (
RISCO_GROUPS,
RISCO_PARTIAL_ARM,
)
from .coordinator import RiscoDataUpdateCoordinator
from .entity import RiscoCloudEntity
_LOGGER = logging.getLogger(__name__)

View file

@ -21,8 +21,9 @@ from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import LocalData, RiscoDataUpdateCoordinator, is_local
from . import LocalData, is_local
from .const import DATA_COORDINATOR, DOMAIN, SYSTEM_UPDATE_SIGNAL
from .coordinator import RiscoDataUpdateCoordinator
from .entity import RiscoCloudZoneEntity, RiscoLocalZoneEntity
SYSTEM_ENTITY_DESCRIPTIONS = [

View file

@ -0,0 +1,81 @@
"""Coordinator for the Risco integration."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import Any
from pyrisco import CannotConnectError, OperationError, RiscoCloud, UnauthorizedError
from pyrisco.cloud.alarm import Alarm
from pyrisco.cloud.event import Event
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN
LAST_EVENT_STORAGE_VERSION = 1
LAST_EVENT_TIMESTAMP_KEY = "last_event_timestamp"
_LOGGER = logging.getLogger(__name__)
class RiscoDataUpdateCoordinator(DataUpdateCoordinator[Alarm]):
"""Class to manage fetching risco data."""
def __init__(
self, hass: HomeAssistant, risco: RiscoCloud, scan_interval: int
) -> None:
"""Initialize global risco data updater."""
self.risco = risco
interval = timedelta(seconds=scan_interval)
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=interval,
)
async def _async_update_data(self) -> Alarm:
"""Fetch data from risco."""
try:
return await self.risco.get_state()
except (CannotConnectError, UnauthorizedError, OperationError) as error:
raise UpdateFailed(error) from error
class RiscoEventsDataUpdateCoordinator(DataUpdateCoordinator[list[Event]]):
"""Class to manage fetching risco data."""
def __init__(
self, hass: HomeAssistant, risco: RiscoCloud, eid: str, scan_interval: int
) -> None:
"""Initialize global risco data updater."""
self.risco = risco
self._store = Store[dict[str, Any]](
hass, LAST_EVENT_STORAGE_VERSION, f"risco_{eid}_last_event_timestamp"
)
interval = timedelta(seconds=scan_interval)
super().__init__(
hass,
_LOGGER,
name=f"{DOMAIN}_events",
update_interval=interval,
)
async def _async_update_data(self) -> list[Event]:
"""Fetch data from risco."""
last_store = await self._store.async_load() or {}
last_timestamp = last_store.get(
LAST_EVENT_TIMESTAMP_KEY, "2020-01-01T00:00:00Z"
)
try:
events = await self.risco.get_events(last_timestamp, 10)
except (CannotConnectError, UnauthorizedError, OperationError) as error:
raise UpdateFailed(error) from error
if len(events) > 0:
await self._store.async_save({LAST_EVENT_TIMESTAMP_KEY: events[0].time})
return events

View file

@ -13,8 +13,9 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import RiscoDataUpdateCoordinator, zone_update_signal
from . import zone_update_signal
from .const import DOMAIN
from .coordinator import RiscoDataUpdateCoordinator
def zone_unique_id(risco: RiscoCloud, zone_id: int) -> str:

View file

@ -17,8 +17,9 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import dt as dt_util
from . import RiscoEventsDataUpdateCoordinator, is_local
from . import is_local
from .const import DOMAIN, EVENTS_COORDINATOR
from .coordinator import RiscoEventsDataUpdateCoordinator
from .entity import zone_unique_id
CATEGORIES = {

View file

@ -12,8 +12,9 @@ from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import LocalData, RiscoDataUpdateCoordinator, is_local
from . import LocalData, is_local
from .const import DATA_COORDINATOR, DOMAIN
from .coordinator import RiscoDataUpdateCoordinator
from .entity import RiscoCloudZoneEntity, RiscoLocalZoneEntity

View file

@ -5,11 +5,8 @@ from unittest.mock import MagicMock, PropertyMock, patch
import pytest
from homeassistant.components.risco import (
LAST_EVENT_TIMESTAMP_KEY,
CannotConnectError,
UnauthorizedError,
)
from homeassistant.components.risco import CannotConnectError, UnauthorizedError
from homeassistant.components.risco.coordinator import LAST_EVENT_TIMESTAMP_KEY
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.util import dt as dt_util
@ -169,7 +166,7 @@ def _set_utc_time_zone(hass):
def save_mock():
"""Create a mock for async_save."""
with patch(
"homeassistant.components.risco.Store.async_save",
"homeassistant.components.risco.coordinator.Store.async_save",
) as save_mock:
yield save_mock
@ -196,7 +193,7 @@ async def test_cloud_setup(
"homeassistant.components.risco.RiscoCloud.get_events", return_value=[]
) as events_mock,
patch(
"homeassistant.components.risco.Store.async_load",
"homeassistant.components.risco.coordinator.Store.async_load",
return_value={LAST_EVENT_TIMESTAMP_KEY: TEST_EVENTS[0].time},
),
):