Create APIs for Insteon panel (#49785)

This commit is contained in:
Tom Harris 2021-07-22 14:11:36 -04:00 committed by GitHub
parent 032cae772a
commit 3461f61f9f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 1943 additions and 7 deletions

View file

@ -9,6 +9,7 @@ from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import CONF_PLATFORM, EVENT_HOMEASSISTANT_STOP from homeassistant.const import CONF_PLATFORM, EVENT_HOMEASSISTANT_STOP
from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.exceptions import ConfigEntryNotReady
from . import api
from .const import ( from .const import (
CONF_CAT, CONF_CAT,
CONF_DIM_STEPS, CONF_DIM_STEPS,
@ -164,6 +165,8 @@ async def async_setup_entry(hass, entry):
sw_version=f"{devices.modem.firmware:02x} Engine Version: {devices.modem.engine_version}", sw_version=f"{devices.modem.firmware:02x} Engine Version: {devices.modem.engine_version}",
) )
api.async_load_api(hass)
asyncio.create_task(async_get_device_config(hass, entry)) asyncio.create_task(async_get_device_config(hass, entry))
return True return True

View file

@ -0,0 +1,44 @@
"""Insteon API interface for the frontend."""
from homeassistant.components import websocket_api
from homeassistant.core import callback
from .aldb import (
websocket_add_default_links,
websocket_change_aldb_record,
websocket_create_aldb_record,
websocket_get_aldb,
websocket_load_aldb,
websocket_notify_on_aldb_status,
websocket_reset_aldb,
websocket_write_aldb,
)
from .device import websocket_get_device
from .properties import (
websocket_change_properties_record,
websocket_get_properties,
websocket_load_properties,
websocket_reset_properties,
websocket_write_properties,
)
@callback
def async_load_api(hass):
"""Set up the web socket API."""
websocket_api.async_register_command(hass, websocket_get_device)
websocket_api.async_register_command(hass, websocket_get_aldb)
websocket_api.async_register_command(hass, websocket_change_aldb_record)
websocket_api.async_register_command(hass, websocket_create_aldb_record)
websocket_api.async_register_command(hass, websocket_write_aldb)
websocket_api.async_register_command(hass, websocket_load_aldb)
websocket_api.async_register_command(hass, websocket_reset_aldb)
websocket_api.async_register_command(hass, websocket_add_default_links)
websocket_api.async_register_command(hass, websocket_notify_on_aldb_status)
websocket_api.async_register_command(hass, websocket_get_properties)
websocket_api.async_register_command(hass, websocket_change_properties_record)
websocket_api.async_register_command(hass, websocket_write_properties)
websocket_api.async_register_command(hass, websocket_load_properties)
websocket_api.async_register_command(hass, websocket_reset_properties)

View file

@ -0,0 +1,309 @@
"""Web socket API for Insteon devices."""
from pyinsteon import devices
from pyinsteon.constants import ALDBStatus
from pyinsteon.topics import (
ALDB_STATUS_CHANGED,
DEVICE_LINK_CONTROLLER_CREATED,
DEVICE_LINK_RESPONDER_CREATED,
)
from pyinsteon.utils import subscribe_topic, unsubscribe_topic
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback
from ..const import DEVICE_ADDRESS, ID, INSTEON_DEVICE_NOT_FOUND, TYPE
from .device import async_device_name, notify_device_not_found
ALDB_RECORD = "record"
ALDB_RECORD_SCHEMA = vol.Schema(
{
vol.Required("mem_addr"): int,
vol.Required("in_use"): bool,
vol.Required("group"): vol.Range(0, 255),
vol.Required("is_controller"): bool,
vol.Optional("highwater"): bool,
vol.Required("target"): str,
vol.Optional("target_name"): str,
vol.Required("data1"): vol.Range(0, 255),
vol.Required("data2"): vol.Range(0, 255),
vol.Required("data3"): vol.Range(0, 255),
vol.Optional("dirty"): bool,
}
)
async def async_aldb_record_to_dict(dev_registry, record, dirty=False):
"""Convert an ALDB record to a dict."""
return ALDB_RECORD_SCHEMA(
{
"mem_addr": record.mem_addr,
"in_use": record.is_in_use,
"is_controller": record.is_controller,
"highwater": record.is_high_water_mark,
"group": record.group,
"target": str(record.target),
"target_name": await async_device_name(dev_registry, record.target),
"data1": record.data1,
"data2": record.data2,
"data3": record.data3,
"dirty": dirty,
}
)
async def async_reload_and_save_aldb(hass, device):
"""Add default links to an Insteon device."""
if device == devices.modem:
await device.aldb.async_load()
else:
await device.aldb.async_load(refresh=True)
await devices.async_save(workdir=hass.config.config_dir)
@websocket_api.websocket_command(
{vol.Required(TYPE): "insteon/aldb/get", vol.Required(DEVICE_ADDRESS): str}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_get_aldb(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Get the All-Link Database for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
# Convert the ALDB to a dict merge in pending changes
aldb = {mem_addr: device.aldb[mem_addr] for mem_addr in device.aldb}
aldb.update(device.aldb.pending_changes)
changed_records = list(device.aldb.pending_changes.keys())
dev_registry = await hass.helpers.device_registry.async_get_registry()
records = [
await async_aldb_record_to_dict(
dev_registry, aldb[mem_addr], mem_addr in changed_records
)
for mem_addr in aldb
]
connection.send_result(msg[ID], records)
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/aldb/change",
vol.Required(DEVICE_ADDRESS): str,
vol.Required(ALDB_RECORD): ALDB_RECORD_SCHEMA,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_change_aldb_record(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Change an All-Link Database record for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
record = msg[ALDB_RECORD]
device.aldb.modify(
mem_addr=record["mem_addr"],
in_use=record["in_use"],
group=record["group"],
controller=record["is_controller"],
target=record["target"],
data1=record["data1"],
data2=record["data2"],
data3=record["data3"],
)
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/aldb/create",
vol.Required(DEVICE_ADDRESS): str,
vol.Required(ALDB_RECORD): ALDB_RECORD_SCHEMA,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_create_aldb_record(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Create an All-Link Database record for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
record = msg[ALDB_RECORD]
device.aldb.add(
group=record["group"],
controller=record["is_controller"],
target=record["target"],
data1=record["data1"],
data2=record["data2"],
data3=record["data3"],
)
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/aldb/write",
vol.Required(DEVICE_ADDRESS): str,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_write_aldb(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Create an All-Link Database record for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
await device.aldb.async_write()
hass.async_create_task(async_reload_and_save_aldb(hass, device))
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/aldb/load",
vol.Required(DEVICE_ADDRESS): str,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_load_aldb(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Create an All-Link Database record for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
hass.async_create_task(async_reload_and_save_aldb(hass, device))
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/aldb/reset",
vol.Required(DEVICE_ADDRESS): str,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_reset_aldb(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Create an All-Link Database record for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
device.aldb.clear_pending()
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/aldb/add_default_links",
vol.Required(DEVICE_ADDRESS): str,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_add_default_links(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Add the default All-Link Database records for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
device.aldb.clear_pending()
await device.async_add_default_links()
hass.async_create_task(async_reload_and_save_aldb(hass, device))
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/aldb/notify",
vol.Required(DEVICE_ADDRESS): str,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_notify_on_aldb_status(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Tell Insteon a new ALDB record was added."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
@callback
def record_added(controller, responder, group):
"""Forward ALDB events to websocket."""
forward_data = {"type": "record_loaded"}
connection.send_message(websocket_api.event_message(msg["id"], forward_data))
@callback
def aldb_loaded():
"""Forward ALDB loaded event to websocket."""
forward_data = {
"type": "status_changed",
"is_loading": device.aldb.status == ALDBStatus.LOADING,
}
connection.send_message(websocket_api.event_message(msg["id"], forward_data))
@callback
def async_cleanup() -> None:
"""Remove signal listeners."""
unsubscribe_topic(record_added, f"{DEVICE_LINK_CONTROLLER_CREATED}.{device.id}")
unsubscribe_topic(record_added, f"{DEVICE_LINK_RESPONDER_CREATED}.{device.id}")
unsubscribe_topic(aldb_loaded, f"{device.id}.{ALDB_STATUS_CHANGED}")
forward_data = {"type": "unsubscribed"}
connection.send_message(websocket_api.event_message(msg["id"], forward_data))
connection.subscriptions[msg["id"]] = async_cleanup
subscribe_topic(record_added, f"{DEVICE_LINK_CONTROLLER_CREATED}.{device.id}")
subscribe_topic(record_added, f"{DEVICE_LINK_RESPONDER_CREATED}.{device.id}")
subscribe_topic(aldb_loaded, f"{device.id}.{ALDB_STATUS_CHANGED}")
connection.send_result(msg[ID])

View file

@ -0,0 +1,79 @@
"""API interface to get an Insteon device."""
from pyinsteon import devices
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant
from ..const import (
DEVICE_ID,
DOMAIN,
HA_DEVICE_NOT_FOUND,
ID,
INSTEON_DEVICE_NOT_FOUND,
TYPE,
)
def compute_device_name(ha_device):
"""Return the HA device name."""
return ha_device.name_by_user if ha_device.name_by_user else ha_device.name
def get_insteon_device_from_ha_device(ha_device):
"""Return the Insteon device from an HA device."""
for identifier in ha_device.identifiers:
if len(identifier) > 1 and identifier[0] == DOMAIN and devices[identifier[1]]:
return devices[identifier[1]]
return None
async def async_device_name(dev_registry, address):
"""Get the Insteon device name from a device registry id."""
ha_device = dev_registry.async_get_device(
identifiers={(DOMAIN, str(address))}, connections=set()
)
if not ha_device:
device = devices[address]
if device:
return f"{device.description} ({device.model})"
return ""
return compute_device_name(ha_device)
def notify_device_not_found(connection, msg, text):
"""Notify the caller that the device was not found."""
connection.send_message(
websocket_api.error_message(msg[ID], websocket_api.const.ERR_NOT_FOUND, text)
)
@websocket_api.websocket_command(
{vol.Required(TYPE): "insteon/device/get", vol.Required(DEVICE_ID): str}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_get_device(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Get an Insteon device."""
dev_registry = await hass.helpers.device_registry.async_get_registry()
ha_device = dev_registry.async_get(msg[DEVICE_ID])
if not ha_device:
notify_device_not_found(connection, msg, HA_DEVICE_NOT_FOUND)
return
device = get_insteon_device_from_ha_device(ha_device)
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
ha_name = compute_device_name(ha_device)
device_info = {
"name": ha_name,
"address": str(device.address),
"is_battery": device.is_battery,
"aldb_status": str(device.aldb.status),
}
connection.send_result(msg[ID], device_info)

View file

@ -0,0 +1,420 @@
"""Property update methods and schemas."""
from itertools import chain
from pyinsteon import devices
from pyinsteon.constants import RAMP_RATES, ResponseStatus
from pyinsteon.device_types.device_base import Device
from pyinsteon.extended_property import (
NON_TOGGLE_MASK,
NON_TOGGLE_ON_OFF_MASK,
OFF_MASK,
ON_MASK,
RAMP_RATE,
)
from pyinsteon.utils import ramp_rate_to_seconds, seconds_to_ramp_rate
import voluptuous as vol
import voluptuous_serialize
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from ..const import (
DEVICE_ADDRESS,
ID,
INSTEON_DEVICE_NOT_FOUND,
PROPERTY_NAME,
PROPERTY_VALUE,
TYPE,
)
from .device import notify_device_not_found
TOGGLE_ON_OFF_MODE = "toggle_on_off_mode"
NON_TOGGLE_ON_MODE = "non_toggle_on_mode"
NON_TOGGLE_OFF_MODE = "non_toggle_off_mode"
RADIO_BUTTON_GROUP_PROP = "radio_button_group_"
TOGGLE_PROP = "toggle_"
RAMP_RATE_SECONDS = list(dict.fromkeys(RAMP_RATES.values()))
RAMP_RATE_SECONDS.sort()
TOGGLE_MODES = {TOGGLE_ON_OFF_MODE: 0, NON_TOGGLE_ON_MODE: 1, NON_TOGGLE_OFF_MODE: 2}
TOGGLE_MODES_SCHEMA = {
0: TOGGLE_ON_OFF_MODE,
1: NON_TOGGLE_ON_MODE,
2: NON_TOGGLE_OFF_MODE,
}
def _bool_schema(name):
return voluptuous_serialize.convert(vol.Schema({vol.Required(name): bool}))[0]
def _byte_schema(name):
return voluptuous_serialize.convert(vol.Schema({vol.Required(name): cv.byte}))[0]
def _ramp_rate_schema(name):
return voluptuous_serialize.convert(
vol.Schema({vol.Required(name): vol.In(RAMP_RATE_SECONDS)}),
custom_serializer=cv.custom_serializer,
)[0]
def get_properties(device: Device):
"""Get the properties of an Insteon device and return the records and schema."""
properties = []
schema = {}
# Limit the properties we manage at this time.
for prop_name in device.operating_flags:
if not device.operating_flags[prop_name].is_read_only:
prop_dict, schema_dict = _get_property(device.operating_flags[prop_name])
properties.append(prop_dict)
schema[prop_name] = schema_dict
mask_found = False
for prop_name in device.properties:
if device.properties[prop_name].is_read_only:
continue
if prop_name == RAMP_RATE:
rr_prop, rr_schema = _get_ramp_rate_property(device.properties[prop_name])
properties.append(rr_prop)
schema[RAMP_RATE] = rr_schema
elif not mask_found and "mask" in prop_name:
mask_found = True
toggle_props, toggle_schema = _get_toggle_properties(device)
properties.extend(toggle_props)
schema.update(toggle_schema)
rb_props, rb_schema = _get_radio_button_properties(device)
properties.extend(rb_props)
schema.update(rb_schema)
else:
prop_dict, schema_dict = _get_property(device.properties[prop_name])
properties.append(prop_dict)
schema[prop_name] = schema_dict
return properties, schema
def set_property(device, prop_name: str, value):
"""Update a property value."""
if isinstance(value, bool) and prop_name in device.operating_flags:
device.operating_flags[prop_name].new_value = value
elif prop_name == RAMP_RATE:
device.properties[prop_name].new_value = seconds_to_ramp_rate(value)
elif prop_name.startswith(RADIO_BUTTON_GROUP_PROP):
buttons = [int(button) for button in value]
rb_groups = _calc_radio_button_groups(device)
curr_group = int(prop_name[len(RADIO_BUTTON_GROUP_PROP) :])
if len(rb_groups) > curr_group:
removed = [btn for btn in rb_groups[curr_group] if btn not in buttons]
if removed:
device.clear_radio_buttons(removed)
if buttons:
device.set_radio_buttons(buttons)
elif prop_name.startswith(TOGGLE_PROP):
button_name = prop_name[len(TOGGLE_PROP) :]
for button in device.groups:
if device.groups[button].name == button_name:
device.set_toggle_mode(button, int(value))
else:
device.properties[prop_name].new_value = value
def _get_property(prop):
"""Return a property data row."""
value, modified = _get_usable_value(prop)
prop_dict = {"name": prop.name, "value": value, "modified": modified}
if isinstance(prop.value, bool):
schema = _bool_schema(prop.name)
else:
schema = _byte_schema(prop.name)
return prop_dict, {"name": prop.name, **schema}
def _get_toggle_properties(device):
"""Generate the mask properties for a KPL device."""
props = []
schema = {}
toggle_prop = device.properties[NON_TOGGLE_MASK]
toggle_on_prop = device.properties[NON_TOGGLE_ON_OFF_MASK]
for button in device.groups:
name = f"{TOGGLE_PROP}{device.groups[button].name}"
value, modified = _toggle_button_value(toggle_prop, toggle_on_prop, button)
props.append({"name": name, "value": value, "modified": modified})
toggle_schema = vol.Schema({vol.Required(name): vol.In(TOGGLE_MODES_SCHEMA)})
toggle_schema_dict = voluptuous_serialize.convert(
toggle_schema, custom_serializer=cv.custom_serializer
)
schema[name] = toggle_schema_dict[0]
return props, schema
def _toggle_button_value(non_toggle_prop, toggle_on_prop, button):
"""Determine the toggle value of a button."""
toggle_mask, toggle_modified = _get_usable_value(non_toggle_prop)
toggle_on_mask, toggle_on_modified = _get_usable_value(toggle_on_prop)
bit = button - 1
if not toggle_mask & 1 << bit:
value = 0
else:
if toggle_on_mask & 1 << bit:
value = 1
else:
value = 2
modified = False
if toggle_modified:
curr_bit = non_toggle_prop.value & 1 << bit
new_bit = non_toggle_prop.new_value & 1 << bit
modified = not curr_bit == new_bit
if not modified and value != 0 and toggle_on_modified:
curr_bit = toggle_on_prop.value & 1 << bit
new_bit = toggle_on_prop.new_value & 1 << bit
modified = not curr_bit == new_bit
return value, modified
def _get_radio_button_properties(device):
"""Return the values and schema to set KPL buttons as radio buttons."""
rb_groups = _calc_radio_button_groups(device)
props = []
schema = {}
index = 0
remaining_buttons = []
buttons_in_groups = list(chain.from_iterable(rb_groups))
# Identify buttons not belonging to any group
for button in device.groups:
if button not in buttons_in_groups:
remaining_buttons.append(button)
for rb_group in rb_groups:
name = f"{RADIO_BUTTON_GROUP_PROP}{index}"
button_1 = rb_group[0]
button_str = f"_{button_1}" if button_1 != 1 else ""
on_mask = device.properties[f"{ON_MASK}{button_str}"]
off_mask = device.properties[f"{OFF_MASK}{button_str}"]
modified = on_mask.is_dirty or off_mask.is_dirty
props.append(
{
"name": name,
"modified": modified,
"value": rb_group,
}
)
options = {
button: device.groups[button].name
for button in chain.from_iterable([rb_group, remaining_buttons])
}
rb_schema = vol.Schema({vol.Optional(name): cv.multi_select(options)})
rb_schema_dict = voluptuous_serialize.convert(
rb_schema, custom_serializer=cv.custom_serializer
)
schema[name] = rb_schema_dict[0]
index += 1
if len(remaining_buttons) > 1:
name = f"{RADIO_BUTTON_GROUP_PROP}{index}"
props.append(
{
"name": name,
"modified": False,
"value": [],
}
)
options = {button: device.groups[button].name for button in remaining_buttons}
rb_schema = vol.Schema({vol.Optional(name): cv.multi_select(options)})
rb_schema_dict = voluptuous_serialize.convert(
rb_schema, custom_serializer=cv.custom_serializer
)
schema[name] = rb_schema_dict[0]
return props, schema
def _calc_radio_button_groups(device):
"""Return existing radio button groups."""
rb_groups = []
for button in device.groups:
if button not in list(chain.from_iterable(rb_groups)):
button_str = "" if button == 1 else f"_{button}"
on_mask, _ = _get_usable_value(device.properties[f"{ON_MASK}{button_str}"])
if on_mask != 0:
rb_group = [button]
for bit in list(range(0, button - 1)) + list(range(button, 8)):
if on_mask & 1 << bit:
rb_group.append(bit + 1)
if len(rb_group) > 1:
rb_groups.append(rb_group)
return rb_groups
def _get_ramp_rate_property(prop):
"""Return the value and schema of a ramp rate property."""
rr_prop, _ = _get_property(prop)
rr_prop["value"] = ramp_rate_to_seconds(rr_prop["value"])
return rr_prop, _ramp_rate_schema(prop.name)
def _get_usable_value(prop):
"""Return the current or the modified value of a property."""
value = prop.value if prop.new_value is None else prop.new_value
return value, prop.is_dirty
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/properties/get",
vol.Required(DEVICE_ADDRESS): str,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_get_properties(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Add the default All-Link Database records for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
properties, schema = get_properties(device)
connection.send_result(msg[ID], {"properties": properties, "schema": schema})
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/properties/change",
vol.Required(DEVICE_ADDRESS): str,
vol.Required(PROPERTY_NAME): str,
vol.Required(PROPERTY_VALUE): vol.Any(list, int, float, bool, str),
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_change_properties_record(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Add the default All-Link Database records for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
set_property(device, msg[PROPERTY_NAME], msg[PROPERTY_VALUE])
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/properties/write",
vol.Required(DEVICE_ADDRESS): str,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_write_properties(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Add the default All-Link Database records for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
result1 = await device.async_write_op_flags()
result2 = await device.async_write_ext_properties()
await devices.async_save(workdir=hass.config.config_dir)
if result1 != ResponseStatus.SUCCESS or result2 != ResponseStatus.SUCCESS:
connection.send_message(
websocket_api.error_message(
msg[ID], "write_failed", "properties not written to device"
)
)
return
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/properties/load",
vol.Required(DEVICE_ADDRESS): str,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_load_properties(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Add the default All-Link Database records for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
result1 = await device.async_read_op_flags()
result2 = await device.async_read_ext_properties()
await devices.async_save(workdir=hass.config.config_dir)
if result1 != ResponseStatus.SUCCESS or result2 != ResponseStatus.SUCCESS:
connection.send_message(
websocket_api.error_message(
msg[ID], "load_failed", "properties not loaded from device"
)
)
return
connection.send_result(msg[ID])
@websocket_api.websocket_command(
{
vol.Required(TYPE): "insteon/properties/reset",
vol.Required(DEVICE_ADDRESS): str,
}
)
@websocket_api.require_admin
@websocket_api.async_response
async def websocket_reset_properties(
hass: HomeAssistant,
connection: websocket_api.connection.ActiveConnection,
msg: dict,
) -> None:
"""Add the default All-Link Database records for an Insteon device."""
device = devices[msg[DEVICE_ADDRESS]]
if not device:
notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND)
return
for prop in device.operating_flags:
device.operating_flags[prop].new_value = None
for prop in device.properties:
device.properties[prop].new_value = None
connection.send_result(msg[ID])

View file

@ -1,4 +1,6 @@
"""Constants used by insteon component.""" """Constants used by insteon component."""
import re
from pyinsteon.groups import ( from pyinsteon.groups import (
CO_SENSOR, CO_SENSOR,
COVER, COVER,
@ -158,3 +160,15 @@ STATE_NAME_LABEL_MAP = {
COVER: "Cover", COVER: "Cover",
RELAY: "Relay", RELAY: "Relay",
} }
TYPE = "type"
ID = "id"
DEVICE_ID = "device_id"
DEVICE_ADDRESS = "device_address"
ALDB_RECORD = "record"
PROPERTY_NAME = "name"
PROPERTY_VALUE = "value"
HA_DEVICE_NOT_FOUND = "ha_device_not_found"
INSTEON_DEVICE_NOT_FOUND = "insteon_device_not_found"
INSTEON_ADDR_REGEX = re.compile(r"([A-Fa-f0-9]{2}\.?[A-Fa-f0-9]{2}\.?[A-Fa-f0-9]{2})$")

View file

@ -10,4 +10,4 @@
], ],
"config_flow": true, "config_flow": true,
"iot_class": "local_push" "iot_class": "local_push"
} }

View file

@ -40,6 +40,7 @@ from .const import (
CONF_X10_ALL_UNITS_OFF, CONF_X10_ALL_UNITS_OFF,
DOMAIN, DOMAIN,
HOUSECODES, HOUSECODES,
INSTEON_ADDR_REGEX,
PORT_HUB_V1, PORT_HUB_V1,
PORT_HUB_V2, PORT_HUB_V2,
SRV_ALL_LINK_GROUP, SRV_ALL_LINK_GROUP,
@ -64,6 +65,13 @@ def set_default_port(schema: dict) -> dict:
return schema return schema
def insteon_address(value: str) -> str:
"""Validate an Insteon address."""
if not INSTEON_ADDR_REGEX.match(value):
raise vol.Invalid("Invalid Insteon Address")
return str(value).replace(".", "").lower()
CONF_DEVICE_OVERRIDE_SCHEMA = vol.All( CONF_DEVICE_OVERRIDE_SCHEMA = vol.All(
vol.Schema( vol.Schema(
{ {

View file

@ -0,0 +1,11 @@
"""Mock connections for Insteon."""
async def mock_successful_connection(*args, **kwargs):
"""Return a successful connection."""
return True
async def mock_failed_connection(*args, **kwargs):
"""Return a failed connection."""
raise ConnectionError("Connection failed")

View file

@ -2,11 +2,14 @@
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
from pyinsteon.address import Address from pyinsteon.address import Address
from pyinsteon.constants import ALDBStatus, ResponseStatus
from pyinsteon.device_types import ( from pyinsteon.device_types import (
GeneralController_MiniRemote_4, DimmableLightingControl_KeypadLinc_8,
GeneralController,
Hub, Hub,
SwitchedLightingControl_SwitchLinc, SwitchedLightingControl_SwitchLinc,
) )
from pyinsteon.managers.saved_devices_manager import dict_to_aldb_record
class MockSwitchLinc(SwitchedLightingControl_SwitchLinc): class MockSwitchLinc(SwitchedLightingControl_SwitchLinc):
@ -32,7 +35,7 @@ class MockDevices:
def __getitem__(self, address): def __getitem__(self, address):
"""Return a a device from the device address.""" """Return a a device from the device address."""
return self._devices.get(address) return self._devices.get(Address(address))
def __iter__(self): def __iter__(self):
"""Return an iterator of device addresses.""" """Return an iterator of device addresses."""
@ -53,13 +56,73 @@ class MockDevices:
addr1 = Address("11.11.11") addr1 = Address("11.11.11")
addr2 = Address("22.22.22") addr2 = Address("22.22.22")
addr3 = Address("33.33.33") addr3 = Address("33.33.33")
self._devices[addr0] = Hub(addr0) self._devices[addr0] = Hub(addr0, 0x03, 0x00, 0x00, "Hub AA.AA.AA", "0")
self._devices[addr1] = MockSwitchLinc(addr1, 0x02, 0x00) self._devices[addr1] = MockSwitchLinc(
self._devices[addr2] = GeneralController_MiniRemote_4(addr2, 0x00, 0x00) addr1, 0x02, 0x00, 0x00, "Device 11.11.11", "1"
self._devices[addr3] = SwitchedLightingControl_SwitchLinc(addr3, 0x02, 0x00) )
self._devices[addr2] = GeneralController(
addr2, 0x00, 0x00, 0x00, "Device 22.22.22", "2"
)
self._devices[addr3] = DimmableLightingControl_KeypadLinc_8(
addr3, 0x02, 0x00, 0x00, "Device 33.33.33", "3"
)
for device in [self._devices[addr] for addr in [addr1, addr2, addr3]]: for device in [self._devices[addr] for addr in [addr1, addr2, addr3]]:
device.async_read_config = AsyncMock() device.async_read_config = AsyncMock()
device.aldb.async_write = AsyncMock()
device.aldb.async_load = AsyncMock()
device.async_add_default_links = AsyncMock()
device.async_read_op_flags = AsyncMock(
return_value=ResponseStatus.SUCCESS
)
device.async_read_ext_properties = AsyncMock(
return_value=ResponseStatus.SUCCESS
)
device.async_write_op_flags = AsyncMock(
return_value=ResponseStatus.SUCCESS
)
device.async_write_ext_properties = AsyncMock(
return_value=ResponseStatus.SUCCESS
)
for device in [self._devices[addr] for addr in [addr2, addr3]]: for device in [self._devices[addr] for addr in [addr2, addr3]]:
device.async_status = AsyncMock() device.async_status = AsyncMock()
self._devices[addr1].async_status = AsyncMock(side_effect=AttributeError) self._devices[addr1].async_status = AsyncMock(side_effect=AttributeError)
self._devices[addr0].aldb.async_load = AsyncMock()
self._devices[addr2].async_read_op_flags = AsyncMock(
return_value=ResponseStatus.FAILURE
)
self._devices[addr2].async_read_ext_properties = AsyncMock(
return_value=ResponseStatus.FAILURE
)
self._devices[addr2].async_write_op_flags = AsyncMock(
return_value=ResponseStatus.FAILURE
)
self._devices[addr2].async_write_ext_properties = AsyncMock(
return_value=ResponseStatus.FAILURE
)
self.modem = self._devices[addr0] self.modem = self._devices[addr0]
def fill_aldb(self, address, records):
"""Fill the All-Link Database for a device."""
device = self._devices[Address(address)]
aldb_records = dict_to_aldb_record(records)
device.aldb.load_saved_records(ALDBStatus.LOADED, aldb_records)
def fill_properties(self, address, props_dict):
"""Fill the operating flags and extended properties of a device."""
device = self._devices[Address(address)]
operating_flags = props_dict.get("operating_flags", {})
properties = props_dict.get("properties", {})
for flag in operating_flags:
value = operating_flags[flag]
if device.operating_flags.get(flag):
device.operating_flags[flag].load(value)
for flag in properties:
value = properties[flag]
if device.properties.get(flag):
device.properties[flag].load(value)

View file

@ -0,0 +1,288 @@
"""Test the Insteon All-Link Database APIs."""
import json
from unittest.mock import patch
from pyinsteon import pub
from pyinsteon.address import Address
from pyinsteon.topics import ALDB_STATUS_CHANGED, DEVICE_LINK_CONTROLLER_CREATED
import pytest
from homeassistant.components import insteon
from homeassistant.components.insteon.api import async_load_api
from homeassistant.components.insteon.api.aldb import (
ALDB_RECORD,
DEVICE_ADDRESS,
ID,
TYPE,
)
from homeassistant.components.insteon.api.device import INSTEON_DEVICE_NOT_FOUND
from .mock_devices import MockDevices
from tests.common import load_fixture
@pytest.fixture(name="aldb_data", scope="session")
def aldb_data_fixture():
"""Load the controller state fixture data."""
return json.loads(load_fixture("insteon/aldb_data.json"))
async def _setup(hass, hass_ws_client, aldb_data):
"""Set up tests."""
ws_client = await hass_ws_client(hass)
devices = MockDevices()
await devices.async_load()
async_load_api(hass)
devices.fill_aldb("33.33.33", aldb_data)
return ws_client, devices
def _compare_records(aldb_rec, dict_rec):
"""Compare a record in the ALDB to the dictionary record."""
assert aldb_rec.is_in_use == dict_rec["in_use"]
assert aldb_rec.is_controller == (dict_rec["is_controller"])
assert not aldb_rec.is_high_water_mark
assert aldb_rec.group == dict_rec["group"]
assert aldb_rec.target == Address(dict_rec["target"])
assert aldb_rec.data1 == dict_rec["data1"]
assert aldb_rec.data2 == dict_rec["data2"]
assert aldb_rec.data3 == dict_rec["data3"]
def _aldb_dict(mem_addr):
"""Generate an ALDB record as a dictionary."""
return {
"mem_addr": mem_addr,
"in_use": True,
"is_controller": True,
"highwater": False,
"group": 100,
"target": "111111",
"data1": 101,
"data2": 102,
"data3": 103,
"dirty": True,
}
async def test_get_aldb(hass, hass_ws_client, aldb_data):
"""Test getting an Insteon device's All-Link Database."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/aldb/get", DEVICE_ADDRESS: "33.33.33"}
)
msg = await ws_client.receive_json()
result = msg["result"]
assert len(result) == 5
async def test_change_aldb_record(hass, hass_ws_client, aldb_data):
"""Test changing an Insteon device's All-Link Database record."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
change_rec = _aldb_dict(4079)
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/aldb/change",
DEVICE_ADDRESS: "33.33.33",
ALDB_RECORD: change_rec,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert len(devices["33.33.33"].aldb.pending_changes) == 1
rec = devices["33.33.33"].aldb.pending_changes[4079]
_compare_records(rec, change_rec)
async def test_create_aldb_record(hass, hass_ws_client, aldb_data):
"""Test creating a new Insteon All-Link Database record."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
new_rec = _aldb_dict(4079)
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/aldb/create",
DEVICE_ADDRESS: "33.33.33",
ALDB_RECORD: new_rec,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert len(devices["33.33.33"].aldb.pending_changes) == 1
rec = devices["33.33.33"].aldb.pending_changes[-1]
_compare_records(rec, new_rec)
async def test_write_aldb(hass, hass_ws_client, aldb_data):
"""Test writing an Insteon device's All-Link Database."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/aldb/write",
DEVICE_ADDRESS: "33.33.33",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert devices["33.33.33"].aldb.async_write.call_count == 1
assert devices["33.33.33"].aldb.async_load.call_count == 1
assert devices.async_save.call_count == 1
async def test_load_aldb(hass, hass_ws_client, aldb_data):
"""Test loading an Insteon device's All-Link Database."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/aldb/load",
DEVICE_ADDRESS: "AA.AA.AA",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert devices["AA.AA.AA"].aldb.async_load.call_count == 1
assert devices.async_save.call_count == 1
async def test_reset_aldb(hass, hass_ws_client, aldb_data):
"""Test resetting an Insteon device's All-Link Database."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
record = _aldb_dict(4079)
devices["33.33.33"].aldb.modify(
mem_addr=record["mem_addr"],
in_use=record["in_use"],
group=record["group"],
controller=record["is_controller"],
target=record["target"],
data1=record["data1"],
data2=record["data2"],
data3=record["data3"],
)
assert devices["33.33.33"].aldb.pending_changes
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/aldb/reset",
DEVICE_ADDRESS: "33.33.33",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert not devices["33.33.33"].aldb.pending_changes
async def test_default_links(hass, hass_ws_client, aldb_data):
"""Test getting an Insteon device's All-Link Database."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/aldb/add_default_links",
DEVICE_ADDRESS: "33.33.33",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert devices["33.33.33"].async_add_default_links.call_count == 1
assert devices["33.33.33"].aldb.async_load.call_count == 1
assert devices.async_save.call_count == 1
async def test_notify_on_aldb_status(hass, hass_ws_client, aldb_data):
"""Test getting an Insteon device's All-Link Database."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/aldb/notify",
DEVICE_ADDRESS: "33.33.33",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
pub.sendMessage(f"333333.{ALDB_STATUS_CHANGED}")
msg = await ws_client.receive_json()
assert msg["event"]["type"] == "status_changed"
assert not msg["event"]["is_loading"]
async def test_notify_on_aldb_record_added(hass, hass_ws_client, aldb_data):
"""Test getting an Insteon device's All-Link Database."""
ws_client, devices = await _setup(hass, hass_ws_client, aldb_data)
with patch.object(insteon.api.aldb, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/aldb/notify",
DEVICE_ADDRESS: "33.33.33",
}
)
msg = await ws_client.receive_json()
assert msg["success"]
pub.sendMessage(
f"{DEVICE_LINK_CONTROLLER_CREATED}.333333",
controller=Address("11.11.11"),
responder=Address("33.33.33"),
group=100,
)
msg = await ws_client.receive_json()
assert msg["event"]["type"] == "record_loaded"
async def test_bad_address(hass, hass_ws_client, aldb_data):
"""Test for a bad Insteon address."""
ws_client, _ = await _setup(hass, hass_ws_client, aldb_data)
record = _aldb_dict(0)
ws_id = 0
for call in ["get", "write", "load", "reset", "add_default_links", "notify"]:
ws_id += 1
await ws_client.send_json(
{
ID: ws_id,
TYPE: f"insteon/aldb/{call}",
DEVICE_ADDRESS: "99.99.99",
}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND
for call in ["change", "create"]:
ws_id += 1
await ws_client.send_json(
{
ID: ws_id,
TYPE: f"insteon/aldb/{call}",
DEVICE_ADDRESS: "99.99.99",
ALDB_RECORD: record,
}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND

View file

@ -0,0 +1,139 @@
"""Test the device level APIs."""
from unittest.mock import patch
from homeassistant.components import insteon
from homeassistant.components.insteon.api import async_load_api
from homeassistant.components.insteon.api.device import (
DEVICE_ID,
HA_DEVICE_NOT_FOUND,
ID,
INSTEON_DEVICE_NOT_FOUND,
TYPE,
async_device_name,
)
from homeassistant.components.insteon.const import DOMAIN
from homeassistant.helpers.device_registry import async_get_registry
from .const import MOCK_USER_INPUT_PLM
from .mock_devices import MockDevices
from tests.common import MockConfigEntry
async def _async_setup(hass, hass_ws_client):
"""Set up for tests."""
config_entry = MockConfigEntry(
domain=DOMAIN,
entry_id="abcde12345",
data=MOCK_USER_INPUT_PLM,
options={},
)
config_entry.add_to_hass(hass)
async_load_api(hass)
ws_client = await hass_ws_client(hass)
devices = MockDevices()
await devices.async_load()
dev_reg = await async_get_registry(hass)
# Create device registry entry for mock node
ha_device = dev_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={(DOMAIN, "11.11.11")},
name="Device 11.11.11",
)
return ws_client, devices, ha_device, dev_reg
async def test_get_device_api(hass, hass_ws_client):
"""Test getting an Insteon device."""
ws_client, devices, ha_device, _ = await _async_setup(hass, hass_ws_client)
with patch.object(insteon.api.device, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/device/get", DEVICE_ID: ha_device.id}
)
msg = await ws_client.receive_json()
result = msg["result"]
assert result["name"] == "Device 11.11.11"
assert result["address"] == "11.11.11"
async def test_no_ha_device(hass, hass_ws_client):
"""Test response when no HA device exists."""
ws_client, devices, _, _ = await _async_setup(hass, hass_ws_client)
with patch.object(insteon.api.device, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/device/get", DEVICE_ID: "not_a_device"}
)
msg = await ws_client.receive_json()
assert not msg.get("result")
assert msg.get("error")
assert msg["error"]["message"] == HA_DEVICE_NOT_FOUND
async def test_no_insteon_device(hass, hass_ws_client):
"""Test response when no Insteon device exists."""
config_entry = MockConfigEntry(
domain=DOMAIN,
entry_id="abcde12345",
data=MOCK_USER_INPUT_PLM,
options={},
)
config_entry.add_to_hass(hass)
async_load_api(hass)
ws_client = await hass_ws_client(hass)
devices = MockDevices()
await devices.async_load()
dev_reg = await async_get_registry(hass)
# Create device registry entry for a Insteon device not in the Insteon devices list
ha_device_1 = dev_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={(DOMAIN, "AA.BB.CC")},
name="HA Device Only",
)
# Create device registry entry for a non-Insteon device
ha_device_2 = dev_reg.async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={("other_domain", "no address")},
name="HA Device Only",
)
with patch.object(insteon.api.device, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/device/get", DEVICE_ID: ha_device_1.id}
)
msg = await ws_client.receive_json()
assert not msg.get("result")
assert msg.get("error")
assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND
await ws_client.send_json(
{ID: 3, TYPE: "insteon/device/get", DEVICE_ID: ha_device_2.id}
)
msg = await ws_client.receive_json()
assert not msg.get("result")
assert msg.get("error")
assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND
async def test_get_ha_device_name(hass, hass_ws_client):
"""Test getting the HA device name from an Insteon address."""
_, devices, _, device_reg = await _async_setup(hass, hass_ws_client)
with patch.object(insteon.api.device, "devices", devices):
# Test a real HA and Insteon device
name = await async_device_name(device_reg, "11.11.11")
assert name == "Device 11.11.11"
# Test no HA device but a real Insteon device
name = await async_device_name(device_reg, "22.22.22")
assert name == "Device 22.22.22 (2)"
# Test no HA or Insteon device
name = await async_device_name(device_reg, "BB.BB.BB")
assert name == ""

View file

@ -0,0 +1,425 @@
"""Test the Insteon properties APIs."""
import json
from unittest.mock import patch
import pytest
from homeassistant.components import insteon
from homeassistant.components.insteon.api import async_load_api
from homeassistant.components.insteon.api.device import INSTEON_DEVICE_NOT_FOUND
from homeassistant.components.insteon.api.properties import (
DEVICE_ADDRESS,
ID,
NON_TOGGLE_MASK,
NON_TOGGLE_OFF_MODE,
NON_TOGGLE_ON_MODE,
NON_TOGGLE_ON_OFF_MASK,
PROPERTY_NAME,
PROPERTY_VALUE,
RADIO_BUTTON_GROUP_PROP,
TOGGLE_MODES,
TOGGLE_ON_OFF_MODE,
TOGGLE_PROP,
TYPE,
_get_radio_button_properties,
_get_toggle_properties,
)
from .mock_devices import MockDevices
from tests.common import load_fixture
@pytest.fixture(name="properties_data", scope="session")
def aldb_data_fixture():
"""Load the controller state fixture data."""
return json.loads(load_fixture("insteon/kpl_properties.json"))
async def _setup(hass, hass_ws_client, properties_data):
"""Set up tests."""
ws_client = await hass_ws_client(hass)
devices = MockDevices()
await devices.async_load()
devices.fill_properties("33.33.33", properties_data)
async_load_api(hass)
return ws_client, devices
async def test_get_properties(hass, hass_ws_client, properties_data):
"""Test getting an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/properties/get", DEVICE_ADDRESS: "33.33.33"}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert len(msg["result"]["properties"]) == 54
async def test_change_operating_flag(hass, hass_ws_client, properties_data):
"""Test changing an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: "led_off",
PROPERTY_VALUE: True,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert devices["33.33.33"].operating_flags["led_off"].is_dirty
async def test_change_property(hass, hass_ws_client, properties_data):
"""Test changing an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: "on_mask",
PROPERTY_VALUE: 100,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert devices["33.33.33"].properties["on_mask"].new_value == 100
assert devices["33.33.33"].properties["on_mask"].is_dirty
async def test_change_ramp_rate_property(hass, hass_ws_client, properties_data):
"""Test changing an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: "ramp_rate",
PROPERTY_VALUE: 4.5,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert devices["33.33.33"].properties["ramp_rate"].new_value == 0x1A
assert devices["33.33.33"].properties["ramp_rate"].is_dirty
async def test_change_radio_button_group(hass, hass_ws_client, properties_data):
"""Test changing an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
rb_props, schema = _get_radio_button_properties(devices["33.33.33"])
# Make sure the baseline is correct
assert rb_props[0]["name"] == f"{RADIO_BUTTON_GROUP_PROP}0"
assert rb_props[0]["value"] == [4, 5]
assert rb_props[1]["value"] == [7, 8]
assert rb_props[2]["value"] == []
assert schema[f"{RADIO_BUTTON_GROUP_PROP}0"]["options"].get(1)
assert schema[f"{RADIO_BUTTON_GROUP_PROP}1"]["options"].get(1)
assert devices["33.33.33"].properties["on_mask"].value == 0
assert devices["33.33.33"].properties["off_mask"].value == 0
assert not devices["33.33.33"].properties["on_mask"].is_dirty
assert not devices["33.33.33"].properties["off_mask"].is_dirty
# Add button 1 to the group
rb_props[0]["value"].append(1)
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: f"{RADIO_BUTTON_GROUP_PROP}0",
PROPERTY_VALUE: rb_props[0]["value"],
}
)
msg = await ws_client.receive_json()
assert msg["success"]
new_rb_props, _ = _get_radio_button_properties(devices["33.33.33"])
assert 1 in new_rb_props[0]["value"]
assert 4 in new_rb_props[0]["value"]
assert 5 in new_rb_props[0]["value"]
assert schema[f"{RADIO_BUTTON_GROUP_PROP}0"]["options"].get(1)
assert schema[f"{RADIO_BUTTON_GROUP_PROP}1"]["options"].get(1)
assert devices["33.33.33"].properties["on_mask"].new_value == 0x18
assert devices["33.33.33"].properties["off_mask"].new_value == 0x18
assert devices["33.33.33"].properties["on_mask"].is_dirty
assert devices["33.33.33"].properties["off_mask"].is_dirty
# Remove button 5
rb_props[0]["value"].remove(5)
await ws_client.send_json(
{
ID: 3,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: f"{RADIO_BUTTON_GROUP_PROP}0",
PROPERTY_VALUE: rb_props[0]["value"],
}
)
msg = await ws_client.receive_json()
assert msg["success"]
new_rb_props, _ = _get_radio_button_properties(devices["33.33.33"])
assert 1 in new_rb_props[0]["value"]
assert 4 in new_rb_props[0]["value"]
assert 5 not in new_rb_props[0]["value"]
assert schema[f"{RADIO_BUTTON_GROUP_PROP}0"]["options"].get(1)
assert schema[f"{RADIO_BUTTON_GROUP_PROP}1"]["options"].get(1)
assert devices["33.33.33"].properties["on_mask"].new_value == 0x08
assert devices["33.33.33"].properties["off_mask"].new_value == 0x08
assert devices["33.33.33"].properties["on_mask"].is_dirty
assert devices["33.33.33"].properties["off_mask"].is_dirty
# Remove button group 1
rb_props[1]["value"] = []
await ws_client.send_json(
{
ID: 5,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: f"{RADIO_BUTTON_GROUP_PROP}1",
PROPERTY_VALUE: rb_props[1]["value"],
}
)
msg = await ws_client.receive_json()
assert msg["success"]
new_rb_props, _ = _get_radio_button_properties(devices["33.33.33"])
assert len(new_rb_props) == 2
assert new_rb_props[0]["value"] == [1, 4]
assert new_rb_props[1]["value"] == []
async def test_create_radio_button_group(hass, hass_ws_client, properties_data):
"""Test changing an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
rb_props, _ = _get_radio_button_properties(devices["33.33.33"])
# Make sure the baseline is correct
assert len(rb_props) == 3
print(rb_props)
rb_props[0]["value"].append("1")
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: f"{RADIO_BUTTON_GROUP_PROP}2",
PROPERTY_VALUE: ["1", "3"],
}
)
msg = await ws_client.receive_json()
assert msg["success"]
new_rb_props, new_schema = _get_radio_button_properties(devices["33.33.33"])
assert len(new_rb_props) == 4
assert 1 in new_rb_props[0]["value"]
assert new_schema[f"{RADIO_BUTTON_GROUP_PROP}0"]["options"].get(1)
assert not new_schema[f"{RADIO_BUTTON_GROUP_PROP}1"]["options"].get(1)
assert devices["33.33.33"].properties["on_mask"].new_value == 4
assert devices["33.33.33"].properties["off_mask"].new_value == 4
assert devices["33.33.33"].properties["on_mask"].is_dirty
assert devices["33.33.33"].properties["off_mask"].is_dirty
async def test_change_toggle_property(hass, hass_ws_client, properties_data):
"""Update a button's toggle mode."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
device = devices["33.33.33"]
toggle_props, _ = _get_toggle_properties(devices["33.33.33"])
# Make sure the baseline is correct
assert toggle_props[0]["name"] == f"{TOGGLE_PROP}{device.groups[1].name}"
assert toggle_props[0]["value"] == TOGGLE_MODES[TOGGLE_ON_OFF_MODE]
assert toggle_props[1]["value"] == TOGGLE_MODES[NON_TOGGLE_ON_MODE]
assert device.properties[NON_TOGGLE_MASK].value == 2
assert device.properties[NON_TOGGLE_ON_OFF_MASK].value == 2
assert not device.properties[NON_TOGGLE_MASK].is_dirty
assert not device.properties[NON_TOGGLE_ON_OFF_MASK].is_dirty
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{
ID: 2,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: toggle_props[0]["name"],
PROPERTY_VALUE: 1,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
new_toggle_props, _ = _get_toggle_properties(devices["33.33.33"])
assert new_toggle_props[0]["value"] == TOGGLE_MODES[NON_TOGGLE_ON_MODE]
assert device.properties[NON_TOGGLE_MASK].new_value == 3
assert device.properties[NON_TOGGLE_ON_OFF_MASK].new_value == 3
assert device.properties[NON_TOGGLE_MASK].is_dirty
assert device.properties[NON_TOGGLE_ON_OFF_MASK].is_dirty
await ws_client.send_json(
{
ID: 3,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: toggle_props[0]["name"],
PROPERTY_VALUE: 2,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
new_toggle_props, _ = _get_toggle_properties(devices["33.33.33"])
assert new_toggle_props[0]["value"] == TOGGLE_MODES[NON_TOGGLE_OFF_MODE]
assert device.properties[NON_TOGGLE_MASK].new_value == 3
assert device.properties[NON_TOGGLE_ON_OFF_MASK].new_value is None
assert device.properties[NON_TOGGLE_MASK].is_dirty
assert not device.properties[NON_TOGGLE_ON_OFF_MASK].is_dirty
await ws_client.send_json(
{
ID: 4,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "33.33.33",
PROPERTY_NAME: toggle_props[1]["name"],
PROPERTY_VALUE: 0,
}
)
msg = await ws_client.receive_json()
assert msg["success"]
new_toggle_props, _ = _get_toggle_properties(devices["33.33.33"])
assert new_toggle_props[1]["value"] == TOGGLE_MODES[TOGGLE_ON_OFF_MODE]
assert device.properties[NON_TOGGLE_MASK].new_value == 1
assert device.properties[NON_TOGGLE_ON_OFF_MASK].new_value == 0
assert device.properties[NON_TOGGLE_MASK].is_dirty
assert device.properties[NON_TOGGLE_ON_OFF_MASK].is_dirty
async def test_write_properties(hass, hass_ws_client, properties_data):
"""Test getting an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/properties/write", DEVICE_ADDRESS: "33.33.33"}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert devices["33.33.33"].async_write_op_flags.call_count == 1
assert devices["33.33.33"].async_write_ext_properties.call_count == 1
async def test_write_properties_failure(hass, hass_ws_client, properties_data):
"""Test getting an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/properties/write", DEVICE_ADDRESS: "22.22.22"}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == "write_failed"
async def test_load_properties(hass, hass_ws_client, properties_data):
"""Test getting an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/properties/load", DEVICE_ADDRESS: "33.33.33"}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert devices["33.33.33"].async_read_op_flags.call_count == 1
assert devices["33.33.33"].async_read_ext_properties.call_count == 1
async def test_load_properties_failure(hass, hass_ws_client, properties_data):
"""Test getting an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/properties/load", DEVICE_ADDRESS: "22.22.22"}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == "load_failed"
async def test_reset_properties(hass, hass_ws_client, properties_data):
"""Test getting an Insteon device's properties."""
ws_client, devices = await _setup(hass, hass_ws_client, properties_data)
device = devices["33.33.33"]
device.operating_flags["led_off"].new_value = True
device.properties["on_mask"].new_value = 100
assert device.operating_flags["led_off"].is_dirty
assert device.properties["on_mask"].is_dirty
with patch.object(insteon.api.properties, "devices", devices):
await ws_client.send_json(
{ID: 2, TYPE: "insteon/properties/reset", DEVICE_ADDRESS: "33.33.33"}
)
msg = await ws_client.receive_json()
assert msg["success"]
assert not device.operating_flags["led_off"].is_dirty
assert not device.properties["on_mask"].is_dirty
async def test_bad_address(hass, hass_ws_client, properties_data):
"""Test for a bad Insteon address."""
ws_client, _ = await _setup(hass, hass_ws_client, properties_data)
ws_id = 0
for call in ["get", "write", "load", "reset"]:
ws_id += 1
await ws_client.send_json(
{
ID: ws_id,
TYPE: f"insteon/properties/{call}",
DEVICE_ADDRESS: "99.99.99",
}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND
ws_id += 1
await ws_client.send_json(
{
ID: ws_id,
TYPE: "insteon/properties/change",
DEVICE_ADDRESS: "99.99.99",
PROPERTY_NAME: "led_off",
PROPERTY_VALUE: True,
}
)
msg = await ws_client.receive_json()
assert not msg["success"]
assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND

67
tests/fixtures/insteon/aldb_data.json vendored Normal file
View file

@ -0,0 +1,67 @@
{
"4095": {
"memory": 4095,
"in_use": true,
"controller": false,
"high_water_mark": false,
"bit5": true,
"bit4": false,
"group": 0,
"target": "aaaaaa",
"data1": 0,
"data2": 0,
"data3": 0
},
"4087": {
"memory": 4087,
"in_use": true,
"controller": true,
"high_water_mark": false,
"bit5": true,
"bit4": false,
"group": 1,
"target": "aaaaaa",
"data1": 0,
"data2": 0,
"data3": 0
},
"4079": {
"memory": 4079,
"in_use": true,
"controller": false,
"high_water_mark": false,
"bit5": true,
"bit4": false,
"group": 0,
"target": "111111",
"data1": 0,
"data2": 0,
"data3": 0
},
"4071": {
"memory": 4071,
"in_use": true,
"controller": true,
"high_water_mark": false,
"bit5": true,
"bit4": false,
"group": 2,
"target": "222222",
"data1": 0,
"data2": 0,
"data3": 0
},
"4063": {
"memory": 4063,
"in_use": true,
"controller": false,
"high_water_mark": false,
"bit5": true,
"bit4": false,
"group": 3,
"target": "333333",
"data1": 0,
"data2": 0,
"data3": 0
}
}

View file

@ -0,0 +1,66 @@
{
"operating_flags": {
"program_lock_on": false,
"blink_on_tx_on": false,
"resume_dim_on": false,
"led_on": false,
"key_beep_on": false,
"rf_disable_on": false,
"powerline_disable_on": false,
"blink_on_error_on": false
},
"properties": {
"led_dimming": 10,
"non_toggle_mask": 2,
"non_toggle_on_off_mask": 2,
"trigger_group_mask": 0,
"on_mask": 0,
"off_mask": 0,
"x10_house": 32,
"x10_unit": 32,
"ramp_rate": 28,
"on_level": 255,
"on_mask_2": 0,
"off_mask_2": 0,
"x10_house_2": 32,
"x10_unit_2": 32,
"ramp_rate_2": 0,
"on_level_2": 0,
"on_mask_3": 0,
"off_mask_3": 0,
"x10_house_3": 32,
"x10_unit_3": 32,
"ramp_rate_3": 0,
"on_level_3": 0,
"on_mask_4": 16,
"off_mask_4": 16,
"x10_house_4": 32,
"x10_unit_4": 32,
"ramp_rate_4": 0,
"on_level_4": 0,
"on_mask_5": 0,
"off_mask_5": 0,
"x10_house_5": 32,
"x10_unit_5": 32,
"ramp_rate_5": 0,
"on_level_5": 0,
"on_mask_6": 0,
"off_mask_6": 0,
"x10_house_6": 32,
"x10_unit_6": 32,
"ramp_rate_6": 0,
"on_level_6": 0,
"on_mask_7": 128,
"off_mask_7": 128,
"x10_house_7": 32,
"x10_unit_7": 32,
"ramp_rate_7": 0,
"on_level_7": 0,
"on_mask_8": 64,
"off_mask_8": 64,
"x10_house_8": 32,
"x10_unit_8": 2,
"ramp_rate_8": 98,
"on_level_8": 74
}
}