Add pin code support to the Risco integration (#39177)

* Pin code support for Risco

* Remove unused parameter

* Fix imports

* Fix typo

* Apply suggestions from code review

Co-authored-by: Chris Talkington <chris@talkingtontech.com>

Co-authored-by: Chris Talkington <chris@talkingtontech.com>
This commit is contained in:
On Freund 2020-08-23 20:34:30 +03:00 committed by GitHub
parent 9e3f7ac8df
commit 15c101e85d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 155 additions and 35 deletions

View file

@ -1,12 +1,16 @@
"""Support for Risco alarms."""
import logging
from homeassistant.components.alarm_control_panel import AlarmControlPanelEntity
from homeassistant.components.alarm_control_panel import (
FORMAT_NUMBER,
AlarmControlPanelEntity,
)
from homeassistant.components.alarm_control_panel.const import (
SUPPORT_ALARM_ARM_AWAY,
SUPPORT_ALARM_ARM_HOME,
)
from homeassistant.const import (
CONF_PIN,
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_HOME,
STATE_ALARM_ARMING,
@ -14,7 +18,12 @@ from homeassistant.const import (
STATE_ALARM_TRIGGERED,
)
from .const import DATA_COORDINATOR, DOMAIN
from .const import (
CONF_CODE_ARM_REQUIRED,
CONF_CODE_DISARM_REQUIRED,
DATA_COORDINATOR,
DOMAIN,
)
from .entity import RiscoEntity
_LOGGER = logging.getLogger(__name__)
@ -30,8 +39,11 @@ SUPPORTED_STATES = [
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the Risco alarm control panel."""
coordinator = hass.data[DOMAIN][config_entry.entry_id][DATA_COORDINATOR]
code = config_entry.data[CONF_PIN]
code_arm_req = config_entry.options.get(CONF_CODE_ARM_REQUIRED, False)
code_disarm_req = config_entry.options.get(CONF_CODE_DISARM_REQUIRED, False)
entities = [
RiscoAlarm(coordinator, partition_id)
RiscoAlarm(coordinator, partition_id, code, code_arm_req, code_disarm_req)
for partition_id in coordinator.data.partitions
]
@ -41,11 +53,16 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity):
"""Representation of a Risco partition."""
def __init__(self, coordinator, partition_id):
def __init__(
self, coordinator, partition_id, code, code_arm_required, code_disarm_required
):
"""Init the partition."""
super().__init__(coordinator)
self._partition_id = partition_id
self._partition = self._coordinator.data.partitions[self._partition_id]
self._code = code
self._code_arm_required = code_arm_required
self._code_disarm_required = code_disarm_required
def _get_data_from_coordinator(self):
self._partition = self._coordinator.data.partitions[self._partition_id]
@ -93,21 +110,39 @@ class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity):
@property
def code_arm_required(self):
"""Whether the code is required for arm actions."""
return False
return self._code_arm_required
@property
def code_format(self):
"""Return one or more digits/characters."""
return FORMAT_NUMBER
def _validate_code(self, code, state):
"""Validate given code."""
check = code == self._code
if not check:
_LOGGER.warning("Wrong code entered for %s", state)
return check
async def async_alarm_disarm(self, code=None):
"""Send disarm command."""
if self._code_disarm_required and not self._validate_code(code, "disarming"):
return
await self._call_alarm_method("disarm")
async def async_alarm_arm_home(self, code=None):
"""Send arm home command."""
if self._code_arm_required and not self._validate_code(code, "arming home"):
return
await self._call_alarm_method("partial_arm")
async def async_alarm_arm_away(self, code=None):
"""Send arm away command."""
if self._code_arm_required and not self._validate_code(code, "arming away"):
return
await self._call_alarm_method("arm")
async def _call_alarm_method(self, method, code=None):
alarm = await getattr(self._risco, method)(self._partition_id)
self._partition = alarm.partitions[self._partition_id]
async def _call_alarm_method(self, method):
alarm_obj = await getattr(self._risco, method)(self._partition_id)
self._partition = alarm_obj.partitions[self._partition_id]
self.async_write_ha_state()

View file

@ -13,7 +13,12 @@ from homeassistant.const import (
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DEFAULT_SCAN_INTERVAL, DOMAIN # pylint:disable=unused-import
from .const import (
CONF_CODE_ARM_REQUIRED,
CONF_CODE_DISARM_REQUIRED,
DEFAULT_SCAN_INTERVAL,
)
from .const import DOMAIN # pylint:disable=unused-import
_LOGGER = logging.getLogger(__name__)
@ -79,17 +84,28 @@ class RiscoOptionsFlowHandler(config_entries.OptionsFlow):
"""Initialize."""
self.config_entry = config_entry
def _options_schema(self):
scan_interval = self.config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
code_arm_required = self.config_entry.options.get(CONF_CODE_ARM_REQUIRED, False)
code_disarm_required = self.config_entry.options.get(
CONF_CODE_DISARM_REQUIRED, False
)
return vol.Schema(
{
vol.Required(CONF_SCAN_INTERVAL, default=scan_interval): int,
vol.Required(CONF_CODE_ARM_REQUIRED, default=code_arm_required): bool,
vol.Required(
CONF_CODE_DISARM_REQUIRED, default=code_disarm_required
): bool,
}
)
async def async_step_init(self, user_input=None):
"""Manage the options."""
if user_input is not None:
return self.async_create_entry(
title="", data={CONF_SCAN_INTERVAL: user_input[CONF_SCAN_INTERVAL]}
)
return self.async_create_entry(title="", data=user_input)
current = self.config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
options = vol.Schema({vol.Required(CONF_SCAN_INTERVAL, default=current): int})
return self.async_show_form(step_id="init", data_schema=options)
return self.async_show_form(step_id="init", data_schema=self._options_schema())

View file

@ -5,3 +5,6 @@ DOMAIN = "risco"
DATA_COORDINATOR = "risco"
DEFAULT_SCAN_INTERVAL = 30
CONF_CODE_ARM_REQUIRED = "code_arm_required"
CONF_CODE_DISARM_REQUIRED = "code_disarm_required"

View file

@ -23,7 +23,9 @@
"init": {
"title": "Configure options",
"data": {
"scan_interval": "How often to poll Risco (in seconds)"
"scan_interval": "How often to poll Risco (in seconds)",
"code_arm_required": "Require pin code to arm",
"code_disarm_required": "Require pin code to disarm"
}
}
}

View file

@ -33,6 +33,8 @@ TEST_SITE_NAME = "test-site-name"
FIRST_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_0"
SECOND_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_1"
CODES_REQUIRED_OPTIONS = {"code_arm_required": True, "code_disarm_required": True}
def _partition_mock():
return MagicMock(
@ -63,8 +65,8 @@ def two_part_alarm():
yield alarm_mock
async def _setup_risco(hass):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
async def _setup_risco(hass, options={}):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG, options=options)
config_entry.add_to_hass(hass)
with patch(
@ -191,14 +193,22 @@ async def test_states(hass, two_part_alarm):
)
async def _test_servie_call(hass, service, method, entity_id, partition_id):
with patch("homeassistant.components.risco.RiscoAPI." + method) as set_mock:
await _call_alarm_service(hass, service, entity_id)
async def _test_service_call(hass, service, method, entity_id, partition_id, **kwargs):
with patch(f"homeassistant.components.risco.RiscoAPI.{method}") as set_mock:
await _call_alarm_service(hass, service, entity_id, **kwargs)
set_mock.assert_awaited_once_with(partition_id)
async def _call_alarm_service(hass, service, entity_id):
data = {"entity_id": entity_id}
async def _test_no_service_call(
hass, service, method, entity_id, partition_id, **kwargs
):
with patch(f"homeassistant.components.risco.RiscoAPI.{method}") as set_mock:
await _call_alarm_service(hass, service, entity_id, **kwargs)
set_mock.assert_not_awaited()
async def _call_alarm_service(hass, service, entity_id, **kwargs):
data = {"entity_id": entity_id, **kwargs}
await hass.services.async_call(
ALARM_DOMAIN, service, service_data=data, blocking=True
@ -209,13 +219,63 @@ async def test_sets(hass, two_part_alarm):
"""Test settings the various modes."""
await _setup_risco(hass)
await _test_servie_call(hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0)
await _test_servie_call(hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1)
await _test_servie_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_ENTITY_ID, 0)
await _test_servie_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_ENTITY_ID, 1)
await _test_servie_call(
await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0)
await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1)
await _test_service_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_ENTITY_ID, 0)
await _test_service_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_ENTITY_ID, 1)
await _test_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_ENTITY_ID, 0
)
await _test_servie_call(
await _test_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1
)
async def test_sets_with_correct_code(hass, two_part_alarm):
"""Test settings the various modes when code is required."""
await _setup_risco(hass, CODES_REQUIRED_OPTIONS)
code = {"code": 1234}
await _test_service_call(
hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0, **code
)
await _test_service_call(
hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1, **code
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_ENTITY_ID, 0, **code
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_ENTITY_ID, 1, **code
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_ENTITY_ID, 0, **code
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1, **code
)
async def test_sets_with_incorrect_code(hass, two_part_alarm):
"""Test settings the various modes when code is required and incorrect."""
await _setup_risco(hass, CODES_REQUIRED_OPTIONS)
code = {"code": 4321}
await _test_no_service_call(
hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0, **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1, **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_ENTITY_ID, 0, **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_ENTITY_ID, 1, **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_ENTITY_ID, 0, **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1, **code
)

View file

@ -129,7 +129,11 @@ async def test_form_already_exists(hass):
async def test_options_flow(hass):
"""Test options flow."""
conf = {"scan_interval": 10}
conf = {
"scan_interval": 10,
"code_arm_required": True,
"code_disarm_required": True,
}
entry = MockConfigEntry(
domain=DOMAIN, unique_id=TEST_DATA["username"], data=TEST_DATA,