241 lines
6.8 KiB
241 lines
6.8 KiB
"""Test helpers for UniFi Protect."""
# pylint: disable=protected-access
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Callable
from unittest.mock import Mock
from pyunifiprotect import ProtectApiClient
from pyunifiprotect.data import (
from pyunifiprotect.data.bootstrap import ProtectDeviceRef
from pyunifiprotect.test_util.anonymize import random_hex
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant, split_entity_id
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.entity import EntityDescription
import homeassistant.util.dt as dt_util
from tests.common import MockConfigEntry, async_fire_time_changed
class MockUFPFixture:
"""Mock for NVR."""
entry: MockConfigEntry
api: ProtectApiClient
ws_subscription: Callable[[WSSubscriptionMessage], None] | None = None
def ws_msg(self, msg: WSSubscriptionMessage) -> Any:
"""Emit WS message for testing."""
if self.ws_subscription is not None:
return self.ws_subscription(msg)
def reset_objects(bootstrap: Bootstrap):
"""Reset bootstrap objects."""
bootstrap.cameras = {}
bootstrap.lights = {}
bootstrap.sensors = {}
bootstrap.viewers = {}
bootstrap.events = {}
bootstrap.doorlocks = {}
bootstrap.chimes = {}
async def time_changed(hass: HomeAssistant, seconds: int) -> None:
"""Trigger time changed."""
next_update = dt_util.utcnow() + timedelta(seconds)
async_fire_time_changed(hass, next_update)
await hass.async_block_till_done()
async def enable_entity(
hass: HomeAssistant, entry_id: str, entity_id: str
) -> er.RegistryEntry:
"""Enable a disabled entity."""
entity_registry = er.async_get(hass)
updated_entity = entity_registry.async_update_entity(entity_id, disabled_by=None)
assert not updated_entity.disabled
await hass.config_entries.async_reload(entry_id)
await hass.async_block_till_done()
return updated_entity
def assert_entity_counts(
hass: HomeAssistant, platform: Platform, total: int, enabled: int
) -> None:
"""Assert entity counts for a given platform."""
entity_registry = er.async_get(hass)
entities = [
e for e in entity_registry.entities if split_entity_id(e)[0] == platform.value
assert len(entities) == total
assert len(hass.states.async_all(platform.value)) == enabled
def normalize_name(name: str) -> str:
"""Normalize name."""
return name.lower().replace(":", "").replace(" ", "_").replace("-", "_")
def ids_from_device_description(
platform: Platform,
device: ProtectAdoptableDeviceModel,
description: EntityDescription,
) -> tuple[str, str]:
"""Return expected unique_id and entity_id for a give platform/device/description combination."""
entity_name = normalize_name(device.display_name)
description_entity_name = normalize_name(str(description.name))
unique_id = f"{device.mac}_{description.key}"
entity_id = f"{platform.value}.{entity_name}_{description_entity_name}"
return unique_id, entity_id
def generate_random_ids() -> tuple[str, str]:
"""Generate random IDs for device."""
return random_hex(24).lower(), random_hex(12).upper()
def regenerate_device_ids(device: ProtectAdoptableDeviceModel) -> None:
"""Regenerate the IDs on UFP device."""
device.id, device.mac = generate_random_ids()
def add_device_ref(bootstrap: Bootstrap, device: ProtectAdoptableDeviceModel) -> None:
"""Manually add device ref to bootstrap for lookup."""
ref = ProtectDeviceRef(id=device.id, model=device.model)
bootstrap.id_lookup[device.id] = ref
bootstrap.mac_lookup[device.mac.lower()] = ref
def add_device(
bootstrap: Bootstrap, device: ProtectAdoptableDeviceModel, regenerate_ids: bool
) -> None:
"""Add test device to bootstrap."""
if device.model is None:
device._api = bootstrap.api
if isinstance(device, Camera):
for channel in device.channels:
channel._api = bootstrap.api
if regenerate_ids:
device._initial_data = device.dict()
devices = getattr(bootstrap, f"{device.model.value}s")
devices[device.id] = device
add_device_ref(bootstrap, device)
async def init_entry(
hass: HomeAssistant,
ufp: MockUFPFixture,
devices: Sequence[ProtectAdoptableDeviceModel],
regenerate_ids: bool = True,
) -> None:
"""Initialize Protect entry with given devices."""
for device in devices:
add_device(ufp.api.bootstrap, device, regenerate_ids)
await hass.config_entries.async_setup(ufp.entry.entry_id)
await hass.async_block_till_done()
async def remove_entities(
hass: HomeAssistant,
ufp: MockUFPFixture,
ufp_devices: list[ProtectAdoptableDeviceModel],
) -> None:
"""Remove all entities for given Protect devices."""
for ufp_device in ufp_devices:
if not ufp_device.is_adopted_by_us:
devices = getattr(ufp.api.bootstrap, f"{ufp_device.model.value}s")
del devices[ufp_device.id]
mock_msg = Mock()
mock_msg.changed_data = {}
mock_msg.old_obj = ufp_device
mock_msg.new_obj = None
await time_changed(hass, 30)
async def adopt_devices(
hass: HomeAssistant,
ufp: MockUFPFixture,
ufp_devices: list[ProtectAdoptableDeviceModel],
fully_adopt: bool = False,
"""Emit WS to re-adopt give Protect devices."""
for ufp_device in ufp_devices:
if fully_adopt:
ufp_device.is_adopted = True
ufp_device.is_adopted_by_other = False
ufp_device.can_adopt = False
devices = getattr(ufp.api.bootstrap, f"{ufp_device.model.value}s")
devices[ufp_device.id] = ufp_device
mock_msg = Mock()
mock_msg.changed_data = {}
mock_msg.new_obj = Event(
metadata={"device_id": ufp_device.id},
await hass.async_block_till_done()
def get_device_from_ufp_device(
hass: HomeAssistant, device: ProtectAdoptableDeviceModel
) -> dr.DeviceEntry | None:
"""Return all device by type."""
registry = dr.async_get(hass)
return registry.async_get_device(
identifiers=set(), connections={(dr.CONNECTION_NETWORK_MAC, device.mac)}