Extract bypassed attribute in Risco zones to a switch (#81137)

* Split bypassed to a switch

* Address code review comments
This commit is contained in:
On Freund 2022-11-01 01:29:00 +02:00 committed by GitHub
parent f8de4c3931
commit 009d5aedd5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 430 additions and 261 deletions

View file

@ -40,7 +40,12 @@ from .const import (
TYPE_LOCAL,
)
PLATFORMS = [Platform.ALARM_CONTROL_PANEL, Platform.BINARY_SENSOR, Platform.SENSOR]
PLATFORMS = [
Platform.ALARM_CONTROL_PANEL,
Platform.BINARY_SENSOR,
Platform.SENSOR,
Platform.SWITCH,
]
LAST_EVENT_STORAGE_VERSION = 1
LAST_EVENT_TIMESTAMP_KEY = "last_event_timestamp"
_LOGGER = logging.getLogger(__name__)

View file

@ -40,7 +40,7 @@ from .const import (
RISCO_GROUPS,
RISCO_PARTIAL_ARM,
)
from .entity import RiscoEntity
from .entity import RiscoCloudEntity
_LOGGER = logging.getLogger(__name__)
@ -178,7 +178,7 @@ class RiscoAlarm(AlarmControlPanelEntity):
raise NotImplementedError
class RiscoCloudAlarm(RiscoAlarm, RiscoEntity):
class RiscoCloudAlarm(RiscoAlarm, RiscoCloudEntity):
"""Representation of a Risco partition."""
def __init__(

View file

@ -12,21 +12,11 @@ from homeassistant.components.binary_sensor import (
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_platform
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import LocalData, RiscoDataUpdateCoordinator, is_local, zone_update_signal
from . import LocalData, RiscoDataUpdateCoordinator, is_local
from .const import DATA_COORDINATOR, DOMAIN
from .entity import RiscoEntity, binary_sensor_unique_id
SERVICE_BYPASS_ZONE = "bypass_zone"
SERVICE_UNBYPASS_ZONE = "unbypass_zone"
def _unique_id_for_local(system_id: str, zone_id: int) -> str:
return f"{system_id}_zone_{zone_id}_local"
from .entity import RiscoCloudZoneEntity, RiscoLocalZoneEntity
async def async_setup_entry(
@ -35,12 +25,6 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the Risco alarm control panel."""
platform = entity_platform.async_get_current_platform()
platform.async_register_entity_service(SERVICE_BYPASS_ZONE, {}, "async_bypass_zone")
platform.async_register_entity_service(
SERVICE_UNBYPASS_ZONE, {}, "async_unbypass_zone"
)
if is_local(config_entry):
local_data: LocalData = hass.data[DOMAIN][config_entry.entry_id]
async_add_entities(
@ -61,85 +45,34 @@ async def async_setup_entry(
)
class RiscoBinarySensor(BinarySensorEntity):
"""Representation of a Risco zone as a binary sensor."""
class RiscoCloudBinarySensor(RiscoCloudZoneEntity, BinarySensorEntity):
"""Representation of a Risco cloud zone as a binary sensor."""
_attr_device_class = BinarySensorDeviceClass.MOTION
def __init__(self, *, zone_id: int, zone: Zone, **kwargs: Any) -> None:
def __init__(
self, coordinator: RiscoDataUpdateCoordinator, zone_id: int, zone: Zone
) -> None:
"""Init the zone."""
super().__init__(**kwargs)
self._zone_id = zone_id
self._zone = zone
self._attr_has_entity_name = True
self._attr_name = None
@property
def extra_state_attributes(self) -> Mapping[str, Any] | None:
"""Return the state attributes."""
return {"zone_id": self._zone_id, "bypassed": self._zone.bypassed}
super().__init__(
coordinator=coordinator, name=None, suffix="", zone_id=zone_id, zone=zone
)
@property
def is_on(self) -> bool | None:
"""Return true if sensor is on."""
return self._zone.triggered
async def async_bypass_zone(self) -> None:
"""Bypass this zone."""
await self._bypass(True)
async def async_unbypass_zone(self) -> None:
"""Unbypass this zone."""
await self._bypass(False)
async def _bypass(self, bypass: bool) -> None:
raise NotImplementedError
class RiscoCloudBinarySensor(RiscoBinarySensor, RiscoEntity):
"""Representation of a Risco cloud zone as a binary sensor."""
def __init__(
self, coordinator: RiscoDataUpdateCoordinator, zone_id: int, zone: Zone
) -> None:
"""Init the zone."""
super().__init__(zone_id=zone_id, zone=zone, coordinator=coordinator)
self._attr_unique_id = binary_sensor_unique_id(self._risco, zone_id)
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, self._attr_unique_id)},
manufacturer="Risco",
name=self._zone.name,
)
def _get_data_from_coordinator(self) -> None:
self._zone = self.coordinator.data.zones[self._zone_id]
async def _bypass(self, bypass: bool) -> None:
alarm = await self._risco.bypass_zone(self._zone_id, bypass)
self._zone = alarm.zones[self._zone_id]
self.async_write_ha_state()
class RiscoLocalBinarySensor(RiscoBinarySensor):
class RiscoLocalBinarySensor(RiscoLocalZoneEntity, BinarySensorEntity):
"""Representation of a Risco local zone as a binary sensor."""
_attr_should_poll = False
_attr_device_class = BinarySensorDeviceClass.MOTION
def __init__(self, system_id: str, zone_id: int, zone: Zone) -> None:
"""Init the zone."""
super().__init__(zone_id=zone_id, zone=zone)
self._attr_unique_id = _unique_id_for_local(system_id, zone_id)
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, self._attr_unique_id)},
manufacturer="Risco",
name=self._zone.name,
)
async def async_added_to_hass(self) -> None:
"""Subscribe to updates."""
signal = zone_update_signal(self._zone_id)
self.async_on_remove(
async_dispatcher_connect(self.hass, signal, self.async_write_ha_state)
super().__init__(
system_id=system_id, name=None, suffix="", zone_id=zone_id, zone=zone
)
@property
@ -150,42 +83,27 @@ class RiscoLocalBinarySensor(RiscoBinarySensor):
"groups": self._zone.groups,
}
async def _bypass(self, bypass: bool) -> None:
await self._zone.bypass(bypass)
@property
def is_on(self) -> bool | None:
"""Return true if sensor is on."""
return self._zone.triggered
class RiscoLocalAlarmedBinarySensor(BinarySensorEntity):
class RiscoLocalAlarmedBinarySensor(RiscoLocalZoneEntity, BinarySensorEntity):
"""Representation whether a zone in Risco local is currently triggering an alarm."""
_attr_should_poll = False
def __init__(self, system_id: str, zone_id: int, zone: Zone) -> None:
"""Init the zone."""
super().__init__()
self._zone_id = zone_id
self._zone = zone
self._attr_has_entity_name = True
self._attr_name = "Alarmed"
device_unique_id = _unique_id_for_local(system_id, zone_id)
self._attr_unique_id = device_unique_id + "_alarmed"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, device_unique_id)},
manufacturer="Risco",
name=self._zone.name,
super().__init__(
system_id=system_id,
name="Alarmed",
suffix="_alarmed",
zone_id=zone_id,
zone=zone,
)
async def async_added_to_hass(self) -> None:
"""Subscribe to updates."""
signal = zone_update_signal(self._zone_id)
self.async_on_remove(
async_dispatcher_connect(self.hass, signal, self.async_write_ha_state)
)
@property
def extra_state_attributes(self) -> Mapping[str, Any] | None:
"""Return the state attributes."""
return {"zone_id": self._zone_id}
@property
def is_on(self) -> bool | None:
"""Return true if sensor is on."""

View file

@ -1,25 +1,40 @@
"""A risco entity base class."""
from __future__ import annotations
from typing import Any
from pyrisco.common import Zone
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import DeviceInfo, Entity
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import RiscoDataUpdateCoordinator
from . import RiscoDataUpdateCoordinator, zone_update_signal
from .const import DOMAIN
def binary_sensor_unique_id(risco, zone_id: int) -> str:
"""Return unique id for the binary sensor."""
def zone_unique_id(risco, zone_id: int) -> str:
"""Return unique id for a cloud zone."""
return f"{risco.site_uuid}_zone_{zone_id}"
class RiscoEntity(CoordinatorEntity[RiscoDataUpdateCoordinator]):
"""Risco entity base class."""
class RiscoCloudEntity(CoordinatorEntity[RiscoDataUpdateCoordinator]):
"""Risco cloud entity base class."""
def _get_data_from_coordinator(self):
def __init__(
self, *, coordinator: RiscoDataUpdateCoordinator, **kwargs: Any
) -> None:
"""Init the entity."""
super().__init__(coordinator=coordinator, **kwargs)
def _get_data_from_coordinator(self) -> None:
raise NotImplementedError
def _refresh_from_coordinator(self):
def _refresh_from_coordinator(self) -> None:
self._get_data_from_coordinator()
self.async_write_ha_state()
async def async_added_to_hass(self):
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
self.async_on_remove(
self.coordinator.async_add_listener(self._refresh_from_coordinator)
@ -29,3 +44,74 @@ class RiscoEntity(CoordinatorEntity[RiscoDataUpdateCoordinator]):
def _risco(self):
"""Return the Risco API object."""
return self.coordinator.risco
class RiscoCloudZoneEntity(RiscoCloudEntity):
"""Risco cloud zone entity base class."""
_attr_has_entity_name = True
def __init__(
self,
*,
coordinator: RiscoDataUpdateCoordinator,
name: str | None,
suffix: str,
zone_id: int,
zone: Zone,
**kwargs: Any,
) -> None:
"""Init the zone."""
super().__init__(coordinator=coordinator, **kwargs)
self._zone_id = zone_id
self._zone = zone
self._attr_name = name
device_unique_id = zone_unique_id(self._risco, zone_id)
self._attr_unique_id = f"{device_unique_id}{suffix}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, device_unique_id)},
manufacturer="Risco",
name=self._zone.name,
)
self._attr_extra_state_attributes = {"zone_id": zone_id}
def _get_data_from_coordinator(self) -> None:
self._zone = self.coordinator.data.zones[self._zone_id]
class RiscoLocalZoneEntity(Entity):
"""Risco local zone entity base class."""
_attr_should_poll = False
_attr_has_entity_name = True
def __init__(
self,
*,
system_id: str,
name: str | None,
suffix: str,
zone_id: int,
zone: Zone,
**kwargs: Any,
) -> None:
"""Init the zone."""
super().__init__(**kwargs)
self._zone_id = zone_id
self._zone = zone
self._attr_name = name
device_unique_id = f"{system_id}_zone_{zone_id}_local"
self._attr_unique_id = f"{device_unique_id}{suffix}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, device_unique_id)},
manufacturer="Risco",
name=zone.name,
)
self._attr_extra_state_attributes = {"zone_id": zone_id}
async def async_added_to_hass(self) -> None:
"""Subscribe to updates."""
signal = zone_update_signal(self._zone_id)
self.async_on_remove(
async_dispatcher_connect(self.hass, signal, self.async_write_ha_state)
)

View file

@ -15,7 +15,7 @@ from homeassistant.util import dt as dt_util
from . import RiscoEventsDataUpdateCoordinator, is_local
from .const import DOMAIN, EVENTS_COORDINATOR
from .entity import binary_sensor_unique_id
from .entity import zone_unique_id
CATEGORIES = {
2: "Alarm",
@ -115,11 +115,9 @@ class RiscoSensor(CoordinatorEntity, SensorEntity):
attrs = {atr: getattr(self._event, atr, None) for atr in EVENT_ATTRIBUTES}
if self._event.zone_id is not None:
zone_unique_id = binary_sensor_unique_id(
self.coordinator.risco, self._event.zone_id
)
uid = zone_unique_id(self.coordinator.risco, self._event.zone_id)
zone_entity_id = self._entity_registry.async_get_entity_id(
BS_DOMAIN, DOMAIN, zone_unique_id
BS_DOMAIN, DOMAIN, uid
)
if zone_entity_id is not None:
attrs["zone_entity_id"] = zone_entity_id

View file

@ -1,17 +0,0 @@
# Describes the format for available Risco services
bypass_zone:
name: Bypass zone
description: Bypass a Risco Zone
target:
entity:
integration: risco
domain: binary_sensor
unbypass_zone:
name: Unbypass zone
description: Unbypass a Risco Zone
target:
entity:
integration: risco
domain: binary_sensor

View file

@ -0,0 +1,104 @@
"""Support for bypassing Risco alarm zones."""
from __future__ import annotations
from pyrisco.common import Zone
from homeassistant.components.switch import SwitchEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import LocalData, RiscoDataUpdateCoordinator, is_local
from .const import DATA_COORDINATOR, DOMAIN
from .entity import RiscoCloudZoneEntity, RiscoLocalZoneEntity
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the Risco switch."""
if is_local(config_entry):
local_data: LocalData = hass.data[DOMAIN][config_entry.entry_id]
async_add_entities(
RiscoLocalSwitch(local_data.system.id, zone_id, zone)
for zone_id, zone in local_data.system.zones.items()
)
else:
coordinator: RiscoDataUpdateCoordinator = hass.data[DOMAIN][
config_entry.entry_id
][DATA_COORDINATOR]
async_add_entities(
RiscoCloudSwitch(coordinator, zone_id, zone)
for zone_id, zone in coordinator.data.zones.items()
)
class RiscoCloudSwitch(RiscoCloudZoneEntity, SwitchEntity):
"""Representation of a bypass switch for a Risco cloud zone."""
_attr_entity_category = EntityCategory.CONFIG
def __init__(
self, coordinator: RiscoDataUpdateCoordinator, zone_id: int, zone: Zone
) -> None:
"""Init the zone."""
super().__init__(
coordinator=coordinator,
name="Bypassed",
suffix="_bypassed",
zone_id=zone_id,
zone=zone,
)
@property
def is_on(self) -> bool | None:
"""Return true if the zone is bypassed."""
return self._zone.bypassed
async def async_turn_on(self, **kwargs):
"""Turn the entity on."""
await self._bypass(True)
async def async_turn_off(self, **kwargs):
"""Turn the entity off."""
await self._bypass(False)
async def _bypass(self, bypass: bool) -> None:
alarm = await self._risco.bypass_zone(self._zone_id, bypass)
self._zone = alarm.zones[self._zone_id]
self.async_write_ha_state()
class RiscoLocalSwitch(RiscoLocalZoneEntity, SwitchEntity):
"""Representation of a bypass switch for a Risco local zone."""
_attr_entity_category = EntityCategory.CONFIG
def __init__(self, system_id: str, zone_id: int, zone: Zone) -> None:
"""Init the zone."""
super().__init__(
system_id=system_id,
name="Bypassed",
suffix="_bypassed",
zone_id=zone_id,
zone=zone,
)
@property
def is_on(self) -> bool | None:
"""Return true if the zone is bypassed."""
return self._zone.bypassed
async def async_turn_on(self, **kwargs):
"""Turn the entity on."""
await self._bypass(True)
async def async_turn_off(self, **kwargs):
"""Turn the entity off."""
await self._bypass(False)
async def _bypass(self, bypass: bool) -> None:
await self._zone.bypass(bypass)

View file

@ -39,10 +39,14 @@ def two_zone_cloud():
zone_mocks[0], "id", new_callable=PropertyMock(return_value=0)
), patch.object(
zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0")
), patch.object(
zone_mocks[0], "bypassed", new_callable=PropertyMock(return_value=False)
), patch.object(
zone_mocks[1], "id", new_callable=PropertyMock(return_value=1)
), patch.object(
zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1")
), patch.object(
zone_mocks[1], "bypassed", new_callable=PropertyMock(return_value=False)
), patch.object(
alarm_mock,
"zones",
@ -54,6 +58,36 @@ def two_zone_cloud():
yield zone_mocks
@fixture
def two_zone_local():
"""Fixture to mock alarm with two zones."""
zone_mocks = {0: zone_mock(), 1: zone_mock()}
with patch.object(
zone_mocks[0], "id", new_callable=PropertyMock(return_value=0)
), patch.object(
zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0")
), patch.object(
zone_mocks[0], "alarmed", new_callable=PropertyMock(return_value=False)
), patch.object(
zone_mocks[0], "bypassed", new_callable=PropertyMock(return_value=False)
), patch.object(
zone_mocks[1], "id", new_callable=PropertyMock(return_value=1)
), patch.object(
zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1")
), patch.object(
zone_mocks[1], "alarmed", new_callable=PropertyMock(return_value=False)
), patch.object(
zone_mocks[1], "bypassed", new_callable=PropertyMock(return_value=False)
), patch(
"homeassistant.components.risco.RiscoLocal.partitions",
new_callable=PropertyMock(return_value={}),
), patch(
"homeassistant.components.risco.RiscoLocal.zones",
new_callable=PropertyMock(return_value=zone_mocks),
):
yield zone_mocks
@fixture
def options():
"""Fixture for default (empty) options."""

View file

@ -9,7 +9,7 @@ from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.entity_component import async_update_entity
from .util import TEST_SITE_UUID, zone_mock
from .util import TEST_SITE_UUID
FIRST_ENTITY_ID = "binary_sensor.zone_0"
SECOND_ENTITY_ID = "binary_sensor.zone_1"
@ -17,32 +17,6 @@ FIRST_ALARMED_ENTITY_ID = FIRST_ENTITY_ID + "_alarmed"
SECOND_ALARMED_ENTITY_ID = SECOND_ENTITY_ID + "_alarmed"
@pytest.fixture
def two_zone_local():
"""Fixture to mock alarm with two zones."""
zone_mocks = {0: zone_mock(), 1: zone_mock()}
with patch.object(
zone_mocks[0], "id", new_callable=PropertyMock(return_value=0)
), patch.object(
zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0")
), patch.object(
zone_mocks[0], "alarmed", new_callable=PropertyMock(return_value=False)
), patch.object(
zone_mocks[1], "id", new_callable=PropertyMock(return_value=1)
), patch.object(
zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1")
), patch.object(
zone_mocks[1], "alarmed", new_callable=PropertyMock(return_value=False)
), patch(
"homeassistant.components.risco.RiscoLocal.partitions",
new_callable=PropertyMock(return_value={}),
), patch(
"homeassistant.components.risco.RiscoLocal.zones",
new_callable=PropertyMock(return_value=zone_mocks),
):
yield zone_mocks
@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError])
async def test_error_on_login(hass, login_with_error, cloud_config_entry):
"""Test error on login."""
@ -69,59 +43,26 @@ async def test_cloud_setup(hass, two_zone_cloud, setup_risco_cloud):
assert device.manufacturer == "Risco"
async def _check_cloud_state(hass, zones, triggered, bypassed, entity_id, zone_id):
async def _check_cloud_state(hass, zones, triggered, entity_id, zone_id):
with patch.object(
zones[zone_id],
"triggered",
new_callable=PropertyMock(return_value=triggered),
), patch.object(
zones[zone_id],
"bypassed",
new_callable=PropertyMock(return_value=bypassed),
):
await async_update_entity(hass, entity_id)
await hass.async_block_till_done()
expected_triggered = STATE_ON if triggered else STATE_OFF
assert hass.states.get(entity_id).state == expected_triggered
assert hass.states.get(entity_id).attributes["bypassed"] == bypassed
assert hass.states.get(entity_id).attributes["zone_id"] == zone_id
async def test_cloud_states(hass, two_zone_cloud, setup_risco_cloud):
"""Test the various alarm states."""
await _check_cloud_state(hass, two_zone_cloud, True, True, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, True, False, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, False, True, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, False, False, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, True, True, SECOND_ENTITY_ID, 1)
await _check_cloud_state(hass, two_zone_cloud, True, False, SECOND_ENTITY_ID, 1)
await _check_cloud_state(hass, two_zone_cloud, False, True, SECOND_ENTITY_ID, 1)
await _check_cloud_state(hass, two_zone_cloud, False, False, SECOND_ENTITY_ID, 1)
async def test_cloud_bypass(hass, two_zone_cloud, setup_risco_cloud):
"""Test bypassing a zone."""
with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
DOMAIN, "bypass_zone", service_data=data, blocking=True
)
mock.assert_awaited_once_with(0, True)
async def test_cloud_unbypass(hass, two_zone_cloud, setup_risco_cloud):
"""Test unbypassing a zone."""
with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
DOMAIN, "unbypass_zone", service_data=data, blocking=True
)
mock.assert_awaited_once_with(0, False)
await _check_cloud_state(hass, two_zone_cloud, True, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, False, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, True, SECOND_ENTITY_ID, 1)
await _check_cloud_state(hass, two_zone_cloud, False, SECOND_ENTITY_ID, 1)
@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError])
@ -154,24 +95,17 @@ async def test_local_setup(hass, two_zone_local, setup_risco_local):
assert device.manufacturer == "Risco"
async def _check_local_state(
hass, zones, triggered, bypassed, entity_id, zone_id, callback
):
async def _check_local_state(hass, zones, triggered, entity_id, zone_id, callback):
with patch.object(
zones[zone_id],
"triggered",
new_callable=PropertyMock(return_value=triggered),
), patch.object(
zones[zone_id],
"bypassed",
new_callable=PropertyMock(return_value=bypassed),
):
await callback(zone_id, zones[zone_id])
await hass.async_block_till_done()
expected_triggered = STATE_ON if triggered else STATE_OFF
assert hass.states.get(entity_id).state == expected_triggered
assert hass.states.get(entity_id).attributes["bypassed"] == bypassed
assert hass.states.get(entity_id).attributes["zone_id"] == zone_id
@ -205,30 +139,10 @@ async def test_local_states(
assert callback is not None
await _check_local_state(
hass, two_zone_local, True, True, FIRST_ENTITY_ID, 0, callback
)
await _check_local_state(
hass, two_zone_local, True, False, FIRST_ENTITY_ID, 0, callback
)
await _check_local_state(
hass, two_zone_local, False, True, FIRST_ENTITY_ID, 0, callback
)
await _check_local_state(
hass, two_zone_local, False, False, FIRST_ENTITY_ID, 0, callback
)
await _check_local_state(
hass, two_zone_local, True, True, SECOND_ENTITY_ID, 1, callback
)
await _check_local_state(
hass, two_zone_local, True, False, SECOND_ENTITY_ID, 1, callback
)
await _check_local_state(
hass, two_zone_local, False, True, SECOND_ENTITY_ID, 1, callback
)
await _check_local_state(
hass, two_zone_local, False, False, SECOND_ENTITY_ID, 1, callback
)
await _check_local_state(hass, two_zone_local, True, FIRST_ENTITY_ID, 0, callback)
await _check_local_state(hass, two_zone_local, False, FIRST_ENTITY_ID, 0, callback)
await _check_local_state(hass, two_zone_local, True, SECOND_ENTITY_ID, 1, callback)
await _check_local_state(hass, two_zone_local, False, SECOND_ENTITY_ID, 1, callback)
async def test_alarmed_local_states(
@ -251,27 +165,3 @@ async def test_alarmed_local_states(
await _check_alarmed_local_state(
hass, two_zone_local, False, SECOND_ALARMED_ENTITY_ID, 1, callback
)
async def test_local_bypass(hass, two_zone_local, setup_risco_local):
"""Test bypassing a zone."""
with patch.object(two_zone_local[0], "bypass") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
DOMAIN, "bypass_zone", service_data=data, blocking=True
)
mock.assert_awaited_once_with(True)
async def test_local_unbypass(hass, two_zone_local, setup_risco_local):
"""Test unbypassing a zone."""
with patch.object(two_zone_local[0], "bypass") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
DOMAIN, "unbypass_zone", service_data=data, blocking=True
)
mock.assert_awaited_once_with(False)

View file

@ -0,0 +1,151 @@
"""Tests for the Risco binary sensors."""
from unittest.mock import PropertyMock, patch
import pytest
from homeassistant.components.risco import CannotConnectError, UnauthorizedError
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.const import SERVICE_TURN_OFF, SERVICE_TURN_ON, STATE_OFF, STATE_ON
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_component import async_update_entity
FIRST_ENTITY_ID = "switch.zone_0_bypassed"
SECOND_ENTITY_ID = "switch.zone_1_bypassed"
@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError])
async def test_error_on_login(hass, login_with_error, cloud_config_entry):
"""Test error on login."""
await hass.config_entries.async_setup(cloud_config_entry.entry_id)
await hass.async_block_till_done()
registry = er.async_get(hass)
assert not registry.async_is_registered(FIRST_ENTITY_ID)
assert not registry.async_is_registered(SECOND_ENTITY_ID)
async def test_cloud_setup(hass, two_zone_cloud, setup_risco_cloud):
"""Test entity setup."""
registry = er.async_get(hass)
assert registry.async_is_registered(FIRST_ENTITY_ID)
assert registry.async_is_registered(SECOND_ENTITY_ID)
async def _check_cloud_state(hass, zones, bypassed, entity_id, zone_id):
with patch.object(
zones[zone_id],
"bypassed",
new_callable=PropertyMock(return_value=bypassed),
):
await async_update_entity(hass, entity_id)
await hass.async_block_till_done()
expected_bypassed = STATE_ON if bypassed else STATE_OFF
assert hass.states.get(entity_id).state == expected_bypassed
assert hass.states.get(entity_id).attributes["zone_id"] == zone_id
async def test_cloud_states(hass, two_zone_cloud, setup_risco_cloud):
"""Test the various alarm states."""
await _check_cloud_state(hass, two_zone_cloud, True, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, False, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, True, SECOND_ENTITY_ID, 1)
await _check_cloud_state(hass, two_zone_cloud, False, SECOND_ENTITY_ID, 1)
async def test_cloud_bypass(hass, two_zone_cloud, setup_risco_cloud):
"""Test bypassing a zone."""
with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
SWITCH_DOMAIN, SERVICE_TURN_ON, service_data=data, blocking=True
)
mock.assert_awaited_once_with(0, True)
async def test_cloud_unbypass(hass, two_zone_cloud, setup_risco_cloud):
"""Test unbypassing a zone."""
with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
SWITCH_DOMAIN, SERVICE_TURN_OFF, service_data=data, blocking=True
)
mock.assert_awaited_once_with(0, False)
@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError])
async def test_error_on_connect(hass, connect_with_error, local_config_entry):
"""Test error on connect."""
await hass.config_entries.async_setup(local_config_entry.entry_id)
await hass.async_block_till_done()
registry = er.async_get(hass)
assert not registry.async_is_registered(FIRST_ENTITY_ID)
assert not registry.async_is_registered(SECOND_ENTITY_ID)
async def test_local_setup(hass, two_zone_local, setup_risco_local):
"""Test entity setup."""
registry = er.async_get(hass)
assert registry.async_is_registered(FIRST_ENTITY_ID)
assert registry.async_is_registered(SECOND_ENTITY_ID)
async def _check_local_state(hass, zones, bypassed, entity_id, zone_id, callback):
with patch.object(
zones[zone_id],
"bypassed",
new_callable=PropertyMock(return_value=bypassed),
):
await callback(zone_id, zones[zone_id])
await hass.async_block_till_done()
expected_bypassed = STATE_ON if bypassed else STATE_OFF
assert hass.states.get(entity_id).state == expected_bypassed
assert hass.states.get(entity_id).attributes["zone_id"] == zone_id
@pytest.fixture
def _mock_zone_handler():
with patch("homeassistant.components.risco.RiscoLocal.add_zone_handler") as mock:
yield mock
async def test_local_states(
hass, two_zone_local, _mock_zone_handler, setup_risco_local
):
"""Test the various alarm states."""
callback = _mock_zone_handler.call_args.args[0]
assert callback is not None
await _check_local_state(hass, two_zone_local, True, FIRST_ENTITY_ID, 0, callback)
await _check_local_state(hass, two_zone_local, False, FIRST_ENTITY_ID, 0, callback)
await _check_local_state(hass, two_zone_local, True, SECOND_ENTITY_ID, 1, callback)
await _check_local_state(hass, two_zone_local, False, SECOND_ENTITY_ID, 1, callback)
async def test_local_bypass(hass, two_zone_local, setup_risco_local):
"""Test bypassing a zone."""
with patch.object(two_zone_local[0], "bypass") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
SWITCH_DOMAIN, SERVICE_TURN_ON, service_data=data, blocking=True
)
mock.assert_awaited_once_with(True)
async def test_local_unbypass(hass, two_zone_local, setup_risco_local):
"""Test unbypassing a zone."""
with patch.object(two_zone_local[0], "bypass") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
SWITCH_DOMAIN, SERVICE_TURN_OFF, service_data=data, blocking=True
)
mock.assert_awaited_once_with(False)