diff --git a/homeassistant/components/insteon/__init__.py b/homeassistant/components/insteon/__init__.py index 2e2d801e1f2..223448953b9 100644 --- a/homeassistant/components/insteon/__init__.py +++ b/homeassistant/components/insteon/__init__.py @@ -9,6 +9,7 @@ from homeassistant.config_entries import SOURCE_IMPORT from homeassistant.const import CONF_PLATFORM, EVENT_HOMEASSISTANT_STOP from homeassistant.exceptions import ConfigEntryNotReady +from . import api from .const import ( CONF_CAT, CONF_DIM_STEPS, @@ -164,6 +165,8 @@ async def async_setup_entry(hass, entry): sw_version=f"{devices.modem.firmware:02x} Engine Version: {devices.modem.engine_version}", ) + api.async_load_api(hass) + asyncio.create_task(async_get_device_config(hass, entry)) return True diff --git a/homeassistant/components/insteon/api/__init__.py b/homeassistant/components/insteon/api/__init__.py new file mode 100644 index 00000000000..3b786a38343 --- /dev/null +++ b/homeassistant/components/insteon/api/__init__.py @@ -0,0 +1,44 @@ +"""Insteon API interface for the frontend.""" + +from homeassistant.components import websocket_api +from homeassistant.core import callback + +from .aldb import ( + websocket_add_default_links, + websocket_change_aldb_record, + websocket_create_aldb_record, + websocket_get_aldb, + websocket_load_aldb, + websocket_notify_on_aldb_status, + websocket_reset_aldb, + websocket_write_aldb, +) +from .device import websocket_get_device +from .properties import ( + websocket_change_properties_record, + websocket_get_properties, + websocket_load_properties, + websocket_reset_properties, + websocket_write_properties, +) + + +@callback +def async_load_api(hass): + """Set up the web socket API.""" + websocket_api.async_register_command(hass, websocket_get_device) + + websocket_api.async_register_command(hass, websocket_get_aldb) + websocket_api.async_register_command(hass, websocket_change_aldb_record) + websocket_api.async_register_command(hass, websocket_create_aldb_record) + websocket_api.async_register_command(hass, websocket_write_aldb) + websocket_api.async_register_command(hass, websocket_load_aldb) + websocket_api.async_register_command(hass, websocket_reset_aldb) + websocket_api.async_register_command(hass, websocket_add_default_links) + websocket_api.async_register_command(hass, websocket_notify_on_aldb_status) + + websocket_api.async_register_command(hass, websocket_get_properties) + websocket_api.async_register_command(hass, websocket_change_properties_record) + websocket_api.async_register_command(hass, websocket_write_properties) + websocket_api.async_register_command(hass, websocket_load_properties) + websocket_api.async_register_command(hass, websocket_reset_properties) diff --git a/homeassistant/components/insteon/api/aldb.py b/homeassistant/components/insteon/api/aldb.py new file mode 100644 index 00000000000..881cb0bb8c7 --- /dev/null +++ b/homeassistant/components/insteon/api/aldb.py @@ -0,0 +1,309 @@ +"""Web socket API for Insteon devices.""" + +from pyinsteon import devices +from pyinsteon.constants import ALDBStatus +from pyinsteon.topics import ( + ALDB_STATUS_CHANGED, + DEVICE_LINK_CONTROLLER_CREATED, + DEVICE_LINK_RESPONDER_CREATED, +) +from pyinsteon.utils import subscribe_topic, unsubscribe_topic +import voluptuous as vol + +from homeassistant.components import websocket_api +from homeassistant.core import HomeAssistant, callback + +from ..const import DEVICE_ADDRESS, ID, INSTEON_DEVICE_NOT_FOUND, TYPE +from .device import async_device_name, notify_device_not_found + +ALDB_RECORD = "record" +ALDB_RECORD_SCHEMA = vol.Schema( + { + vol.Required("mem_addr"): int, + vol.Required("in_use"): bool, + vol.Required("group"): vol.Range(0, 255), + vol.Required("is_controller"): bool, + vol.Optional("highwater"): bool, + vol.Required("target"): str, + vol.Optional("target_name"): str, + vol.Required("data1"): vol.Range(0, 255), + vol.Required("data2"): vol.Range(0, 255), + vol.Required("data3"): vol.Range(0, 255), + vol.Optional("dirty"): bool, + } +) + + +async def async_aldb_record_to_dict(dev_registry, record, dirty=False): + """Convert an ALDB record to a dict.""" + return ALDB_RECORD_SCHEMA( + { + "mem_addr": record.mem_addr, + "in_use": record.is_in_use, + "is_controller": record.is_controller, + "highwater": record.is_high_water_mark, + "group": record.group, + "target": str(record.target), + "target_name": await async_device_name(dev_registry, record.target), + "data1": record.data1, + "data2": record.data2, + "data3": record.data3, + "dirty": dirty, + } + ) + + +async def async_reload_and_save_aldb(hass, device): + """Add default links to an Insteon device.""" + if device == devices.modem: + await device.aldb.async_load() + else: + await device.aldb.async_load(refresh=True) + await devices.async_save(workdir=hass.config.config_dir) + + +@websocket_api.websocket_command( + {vol.Required(TYPE): "insteon/aldb/get", vol.Required(DEVICE_ADDRESS): str} +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_get_aldb( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Get the All-Link Database for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + # Convert the ALDB to a dict merge in pending changes + aldb = {mem_addr: device.aldb[mem_addr] for mem_addr in device.aldb} + aldb.update(device.aldb.pending_changes) + changed_records = list(device.aldb.pending_changes.keys()) + + dev_registry = await hass.helpers.device_registry.async_get_registry() + + records = [ + await async_aldb_record_to_dict( + dev_registry, aldb[mem_addr], mem_addr in changed_records + ) + for mem_addr in aldb + ] + + connection.send_result(msg[ID], records) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/aldb/change", + vol.Required(DEVICE_ADDRESS): str, + vol.Required(ALDB_RECORD): ALDB_RECORD_SCHEMA, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_change_aldb_record( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Change an All-Link Database record for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + record = msg[ALDB_RECORD] + device.aldb.modify( + mem_addr=record["mem_addr"], + in_use=record["in_use"], + group=record["group"], + controller=record["is_controller"], + target=record["target"], + data1=record["data1"], + data2=record["data2"], + data3=record["data3"], + ) + connection.send_result(msg[ID]) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/aldb/create", + vol.Required(DEVICE_ADDRESS): str, + vol.Required(ALDB_RECORD): ALDB_RECORD_SCHEMA, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_create_aldb_record( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Create an All-Link Database record for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + record = msg[ALDB_RECORD] + device.aldb.add( + group=record["group"], + controller=record["is_controller"], + target=record["target"], + data1=record["data1"], + data2=record["data2"], + data3=record["data3"], + ) + connection.send_result(msg[ID]) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/aldb/write", + vol.Required(DEVICE_ADDRESS): str, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_write_aldb( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Create an All-Link Database record for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + await device.aldb.async_write() + hass.async_create_task(async_reload_and_save_aldb(hass, device)) + connection.send_result(msg[ID]) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/aldb/load", + vol.Required(DEVICE_ADDRESS): str, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_load_aldb( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Create an All-Link Database record for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + hass.async_create_task(async_reload_and_save_aldb(hass, device)) + connection.send_result(msg[ID]) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/aldb/reset", + vol.Required(DEVICE_ADDRESS): str, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_reset_aldb( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Create an All-Link Database record for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + device.aldb.clear_pending() + connection.send_result(msg[ID]) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/aldb/add_default_links", + vol.Required(DEVICE_ADDRESS): str, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_add_default_links( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Add the default All-Link Database records for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + device.aldb.clear_pending() + await device.async_add_default_links() + hass.async_create_task(async_reload_and_save_aldb(hass, device)) + connection.send_result(msg[ID]) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/aldb/notify", + vol.Required(DEVICE_ADDRESS): str, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_notify_on_aldb_status( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Tell Insteon a new ALDB record was added.""" + + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + @callback + def record_added(controller, responder, group): + """Forward ALDB events to websocket.""" + forward_data = {"type": "record_loaded"} + connection.send_message(websocket_api.event_message(msg["id"], forward_data)) + + @callback + def aldb_loaded(): + """Forward ALDB loaded event to websocket.""" + forward_data = { + "type": "status_changed", + "is_loading": device.aldb.status == ALDBStatus.LOADING, + } + connection.send_message(websocket_api.event_message(msg["id"], forward_data)) + + @callback + def async_cleanup() -> None: + """Remove signal listeners.""" + unsubscribe_topic(record_added, f"{DEVICE_LINK_CONTROLLER_CREATED}.{device.id}") + unsubscribe_topic(record_added, f"{DEVICE_LINK_RESPONDER_CREATED}.{device.id}") + unsubscribe_topic(aldb_loaded, f"{device.id}.{ALDB_STATUS_CHANGED}") + + forward_data = {"type": "unsubscribed"} + connection.send_message(websocket_api.event_message(msg["id"], forward_data)) + + connection.subscriptions[msg["id"]] = async_cleanup + subscribe_topic(record_added, f"{DEVICE_LINK_CONTROLLER_CREATED}.{device.id}") + subscribe_topic(record_added, f"{DEVICE_LINK_RESPONDER_CREATED}.{device.id}") + subscribe_topic(aldb_loaded, f"{device.id}.{ALDB_STATUS_CHANGED}") + + connection.send_result(msg[ID]) diff --git a/homeassistant/components/insteon/api/device.py b/homeassistant/components/insteon/api/device.py new file mode 100644 index 00000000000..9d77e8b765c --- /dev/null +++ b/homeassistant/components/insteon/api/device.py @@ -0,0 +1,79 @@ +"""API interface to get an Insteon device.""" + +from pyinsteon import devices +import voluptuous as vol + +from homeassistant.components import websocket_api +from homeassistant.core import HomeAssistant + +from ..const import ( + DEVICE_ID, + DOMAIN, + HA_DEVICE_NOT_FOUND, + ID, + INSTEON_DEVICE_NOT_FOUND, + TYPE, +) + + +def compute_device_name(ha_device): + """Return the HA device name.""" + return ha_device.name_by_user if ha_device.name_by_user else ha_device.name + + +def get_insteon_device_from_ha_device(ha_device): + """Return the Insteon device from an HA device.""" + for identifier in ha_device.identifiers: + if len(identifier) > 1 and identifier[0] == DOMAIN and devices[identifier[1]]: + return devices[identifier[1]] + return None + + +async def async_device_name(dev_registry, address): + """Get the Insteon device name from a device registry id.""" + ha_device = dev_registry.async_get_device( + identifiers={(DOMAIN, str(address))}, connections=set() + ) + if not ha_device: + device = devices[address] + if device: + return f"{device.description} ({device.model})" + return "" + return compute_device_name(ha_device) + + +def notify_device_not_found(connection, msg, text): + """Notify the caller that the device was not found.""" + connection.send_message( + websocket_api.error_message(msg[ID], websocket_api.const.ERR_NOT_FOUND, text) + ) + + +@websocket_api.websocket_command( + {vol.Required(TYPE): "insteon/device/get", vol.Required(DEVICE_ID): str} +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_get_device( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Get an Insteon device.""" + dev_registry = await hass.helpers.device_registry.async_get_registry() + ha_device = dev_registry.async_get(msg[DEVICE_ID]) + if not ha_device: + notify_device_not_found(connection, msg, HA_DEVICE_NOT_FOUND) + return + device = get_insteon_device_from_ha_device(ha_device) + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + ha_name = compute_device_name(ha_device) + device_info = { + "name": ha_name, + "address": str(device.address), + "is_battery": device.is_battery, + "aldb_status": str(device.aldb.status), + } + connection.send_result(msg[ID], device_info) diff --git a/homeassistant/components/insteon/api/properties.py b/homeassistant/components/insteon/api/properties.py new file mode 100644 index 00000000000..0b3b643b617 --- /dev/null +++ b/homeassistant/components/insteon/api/properties.py @@ -0,0 +1,420 @@ +"""Property update methods and schemas.""" +from itertools import chain + +from pyinsteon import devices +from pyinsteon.constants import RAMP_RATES, ResponseStatus +from pyinsteon.device_types.device_base import Device +from pyinsteon.extended_property import ( + NON_TOGGLE_MASK, + NON_TOGGLE_ON_OFF_MASK, + OFF_MASK, + ON_MASK, + RAMP_RATE, +) +from pyinsteon.utils import ramp_rate_to_seconds, seconds_to_ramp_rate +import voluptuous as vol +import voluptuous_serialize + +from homeassistant.components import websocket_api +from homeassistant.core import HomeAssistant +import homeassistant.helpers.config_validation as cv + +from ..const import ( + DEVICE_ADDRESS, + ID, + INSTEON_DEVICE_NOT_FOUND, + PROPERTY_NAME, + PROPERTY_VALUE, + TYPE, +) +from .device import notify_device_not_found + +TOGGLE_ON_OFF_MODE = "toggle_on_off_mode" +NON_TOGGLE_ON_MODE = "non_toggle_on_mode" +NON_TOGGLE_OFF_MODE = "non_toggle_off_mode" +RADIO_BUTTON_GROUP_PROP = "radio_button_group_" +TOGGLE_PROP = "toggle_" +RAMP_RATE_SECONDS = list(dict.fromkeys(RAMP_RATES.values())) +RAMP_RATE_SECONDS.sort() +TOGGLE_MODES = {TOGGLE_ON_OFF_MODE: 0, NON_TOGGLE_ON_MODE: 1, NON_TOGGLE_OFF_MODE: 2} +TOGGLE_MODES_SCHEMA = { + 0: TOGGLE_ON_OFF_MODE, + 1: NON_TOGGLE_ON_MODE, + 2: NON_TOGGLE_OFF_MODE, +} + + +def _bool_schema(name): + return voluptuous_serialize.convert(vol.Schema({vol.Required(name): bool}))[0] + + +def _byte_schema(name): + return voluptuous_serialize.convert(vol.Schema({vol.Required(name): cv.byte}))[0] + + +def _ramp_rate_schema(name): + return voluptuous_serialize.convert( + vol.Schema({vol.Required(name): vol.In(RAMP_RATE_SECONDS)}), + custom_serializer=cv.custom_serializer, + )[0] + + +def get_properties(device: Device): + """Get the properties of an Insteon device and return the records and schema.""" + + properties = [] + schema = {} + + # Limit the properties we manage at this time. + for prop_name in device.operating_flags: + if not device.operating_flags[prop_name].is_read_only: + prop_dict, schema_dict = _get_property(device.operating_flags[prop_name]) + properties.append(prop_dict) + schema[prop_name] = schema_dict + + mask_found = False + for prop_name in device.properties: + if device.properties[prop_name].is_read_only: + continue + + if prop_name == RAMP_RATE: + rr_prop, rr_schema = _get_ramp_rate_property(device.properties[prop_name]) + properties.append(rr_prop) + schema[RAMP_RATE] = rr_schema + + elif not mask_found and "mask" in prop_name: + mask_found = True + toggle_props, toggle_schema = _get_toggle_properties(device) + properties.extend(toggle_props) + schema.update(toggle_schema) + + rb_props, rb_schema = _get_radio_button_properties(device) + properties.extend(rb_props) + schema.update(rb_schema) + else: + prop_dict, schema_dict = _get_property(device.properties[prop_name]) + properties.append(prop_dict) + schema[prop_name] = schema_dict + + return properties, schema + + +def set_property(device, prop_name: str, value): + """Update a property value.""" + if isinstance(value, bool) and prop_name in device.operating_flags: + device.operating_flags[prop_name].new_value = value + + elif prop_name == RAMP_RATE: + device.properties[prop_name].new_value = seconds_to_ramp_rate(value) + + elif prop_name.startswith(RADIO_BUTTON_GROUP_PROP): + buttons = [int(button) for button in value] + rb_groups = _calc_radio_button_groups(device) + curr_group = int(prop_name[len(RADIO_BUTTON_GROUP_PROP) :]) + if len(rb_groups) > curr_group: + removed = [btn for btn in rb_groups[curr_group] if btn not in buttons] + if removed: + device.clear_radio_buttons(removed) + if buttons: + device.set_radio_buttons(buttons) + + elif prop_name.startswith(TOGGLE_PROP): + button_name = prop_name[len(TOGGLE_PROP) :] + for button in device.groups: + if device.groups[button].name == button_name: + device.set_toggle_mode(button, int(value)) + + else: + device.properties[prop_name].new_value = value + + +def _get_property(prop): + """Return a property data row.""" + value, modified = _get_usable_value(prop) + prop_dict = {"name": prop.name, "value": value, "modified": modified} + if isinstance(prop.value, bool): + schema = _bool_schema(prop.name) + else: + schema = _byte_schema(prop.name) + return prop_dict, {"name": prop.name, **schema} + + +def _get_toggle_properties(device): + """Generate the mask properties for a KPL device.""" + props = [] + schema = {} + toggle_prop = device.properties[NON_TOGGLE_MASK] + toggle_on_prop = device.properties[NON_TOGGLE_ON_OFF_MASK] + for button in device.groups: + name = f"{TOGGLE_PROP}{device.groups[button].name}" + value, modified = _toggle_button_value(toggle_prop, toggle_on_prop, button) + props.append({"name": name, "value": value, "modified": modified}) + toggle_schema = vol.Schema({vol.Required(name): vol.In(TOGGLE_MODES_SCHEMA)}) + toggle_schema_dict = voluptuous_serialize.convert( + toggle_schema, custom_serializer=cv.custom_serializer + ) + schema[name] = toggle_schema_dict[0] + return props, schema + + +def _toggle_button_value(non_toggle_prop, toggle_on_prop, button): + """Determine the toggle value of a button.""" + toggle_mask, toggle_modified = _get_usable_value(non_toggle_prop) + toggle_on_mask, toggle_on_modified = _get_usable_value(toggle_on_prop) + + bit = button - 1 + if not toggle_mask & 1 << bit: + value = 0 + else: + if toggle_on_mask & 1 << bit: + value = 1 + else: + value = 2 + + modified = False + if toggle_modified: + curr_bit = non_toggle_prop.value & 1 << bit + new_bit = non_toggle_prop.new_value & 1 << bit + modified = not curr_bit == new_bit + + if not modified and value != 0 and toggle_on_modified: + curr_bit = toggle_on_prop.value & 1 << bit + new_bit = toggle_on_prop.new_value & 1 << bit + modified = not curr_bit == new_bit + + return value, modified + + +def _get_radio_button_properties(device): + """Return the values and schema to set KPL buttons as radio buttons.""" + rb_groups = _calc_radio_button_groups(device) + props = [] + schema = {} + index = 0 + remaining_buttons = [] + + buttons_in_groups = list(chain.from_iterable(rb_groups)) + + # Identify buttons not belonging to any group + for button in device.groups: + if button not in buttons_in_groups: + remaining_buttons.append(button) + + for rb_group in rb_groups: + name = f"{RADIO_BUTTON_GROUP_PROP}{index}" + button_1 = rb_group[0] + button_str = f"_{button_1}" if button_1 != 1 else "" + on_mask = device.properties[f"{ON_MASK}{button_str}"] + off_mask = device.properties[f"{OFF_MASK}{button_str}"] + modified = on_mask.is_dirty or off_mask.is_dirty + + props.append( + { + "name": name, + "modified": modified, + "value": rb_group, + } + ) + + options = { + button: device.groups[button].name + for button in chain.from_iterable([rb_group, remaining_buttons]) + } + rb_schema = vol.Schema({vol.Optional(name): cv.multi_select(options)}) + + rb_schema_dict = voluptuous_serialize.convert( + rb_schema, custom_serializer=cv.custom_serializer + ) + schema[name] = rb_schema_dict[0] + + index += 1 + + if len(remaining_buttons) > 1: + name = f"{RADIO_BUTTON_GROUP_PROP}{index}" + + props.append( + { + "name": name, + "modified": False, + "value": [], + } + ) + + options = {button: device.groups[button].name for button in remaining_buttons} + rb_schema = vol.Schema({vol.Optional(name): cv.multi_select(options)}) + + rb_schema_dict = voluptuous_serialize.convert( + rb_schema, custom_serializer=cv.custom_serializer + ) + schema[name] = rb_schema_dict[0] + + return props, schema + + +def _calc_radio_button_groups(device): + """Return existing radio button groups.""" + rb_groups = [] + for button in device.groups: + if button not in list(chain.from_iterable(rb_groups)): + button_str = "" if button == 1 else f"_{button}" + on_mask, _ = _get_usable_value(device.properties[f"{ON_MASK}{button_str}"]) + if on_mask != 0: + rb_group = [button] + for bit in list(range(0, button - 1)) + list(range(button, 8)): + if on_mask & 1 << bit: + rb_group.append(bit + 1) + if len(rb_group) > 1: + rb_groups.append(rb_group) + return rb_groups + + +def _get_ramp_rate_property(prop): + """Return the value and schema of a ramp rate property.""" + rr_prop, _ = _get_property(prop) + rr_prop["value"] = ramp_rate_to_seconds(rr_prop["value"]) + return rr_prop, _ramp_rate_schema(prop.name) + + +def _get_usable_value(prop): + """Return the current or the modified value of a property.""" + value = prop.value if prop.new_value is None else prop.new_value + return value, prop.is_dirty + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/properties/get", + vol.Required(DEVICE_ADDRESS): str, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_get_properties( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Add the default All-Link Database records for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + properties, schema = get_properties(device) + + connection.send_result(msg[ID], {"properties": properties, "schema": schema}) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/properties/change", + vol.Required(DEVICE_ADDRESS): str, + vol.Required(PROPERTY_NAME): str, + vol.Required(PROPERTY_VALUE): vol.Any(list, int, float, bool, str), + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_change_properties_record( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Add the default All-Link Database records for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + set_property(device, msg[PROPERTY_NAME], msg[PROPERTY_VALUE]) + connection.send_result(msg[ID]) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/properties/write", + vol.Required(DEVICE_ADDRESS): str, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_write_properties( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Add the default All-Link Database records for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + result1 = await device.async_write_op_flags() + result2 = await device.async_write_ext_properties() + await devices.async_save(workdir=hass.config.config_dir) + if result1 != ResponseStatus.SUCCESS or result2 != ResponseStatus.SUCCESS: + connection.send_message( + websocket_api.error_message( + msg[ID], "write_failed", "properties not written to device" + ) + ) + return + connection.send_result(msg[ID]) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/properties/load", + vol.Required(DEVICE_ADDRESS): str, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_load_properties( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Add the default All-Link Database records for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + result1 = await device.async_read_op_flags() + result2 = await device.async_read_ext_properties() + await devices.async_save(workdir=hass.config.config_dir) + if result1 != ResponseStatus.SUCCESS or result2 != ResponseStatus.SUCCESS: + connection.send_message( + websocket_api.error_message( + msg[ID], "load_failed", "properties not loaded from device" + ) + ) + return + connection.send_result(msg[ID]) + + +@websocket_api.websocket_command( + { + vol.Required(TYPE): "insteon/properties/reset", + vol.Required(DEVICE_ADDRESS): str, + } +) +@websocket_api.require_admin +@websocket_api.async_response +async def websocket_reset_properties( + hass: HomeAssistant, + connection: websocket_api.connection.ActiveConnection, + msg: dict, +) -> None: + """Add the default All-Link Database records for an Insteon device.""" + device = devices[msg[DEVICE_ADDRESS]] + if not device: + notify_device_not_found(connection, msg, INSTEON_DEVICE_NOT_FOUND) + return + + for prop in device.operating_flags: + device.operating_flags[prop].new_value = None + for prop in device.properties: + device.properties[prop].new_value = None + connection.send_result(msg[ID]) diff --git a/homeassistant/components/insteon/const.py b/homeassistant/components/insteon/const.py index a40a0b0d4b0..dca53d20369 100644 --- a/homeassistant/components/insteon/const.py +++ b/homeassistant/components/insteon/const.py @@ -1,4 +1,6 @@ """Constants used by insteon component.""" +import re + from pyinsteon.groups import ( CO_SENSOR, COVER, @@ -158,3 +160,15 @@ STATE_NAME_LABEL_MAP = { COVER: "Cover", RELAY: "Relay", } + +TYPE = "type" +ID = "id" +DEVICE_ID = "device_id" +DEVICE_ADDRESS = "device_address" +ALDB_RECORD = "record" +PROPERTY_NAME = "name" +PROPERTY_VALUE = "value" +HA_DEVICE_NOT_FOUND = "ha_device_not_found" +INSTEON_DEVICE_NOT_FOUND = "insteon_device_not_found" + +INSTEON_ADDR_REGEX = re.compile(r"([A-Fa-f0-9]{2}\.?[A-Fa-f0-9]{2}\.?[A-Fa-f0-9]{2})$") diff --git a/homeassistant/components/insteon/manifest.json b/homeassistant/components/insteon/manifest.json index 4643a8c662a..f5f9d57d8a8 100644 --- a/homeassistant/components/insteon/manifest.json +++ b/homeassistant/components/insteon/manifest.json @@ -10,4 +10,4 @@ ], "config_flow": true, "iot_class": "local_push" -} \ No newline at end of file +} diff --git a/homeassistant/components/insteon/schemas.py b/homeassistant/components/insteon/schemas.py index 5fb46735f29..626dc7dde4b 100644 --- a/homeassistant/components/insteon/schemas.py +++ b/homeassistant/components/insteon/schemas.py @@ -40,6 +40,7 @@ from .const import ( CONF_X10_ALL_UNITS_OFF, DOMAIN, HOUSECODES, + INSTEON_ADDR_REGEX, PORT_HUB_V1, PORT_HUB_V2, SRV_ALL_LINK_GROUP, @@ -64,6 +65,13 @@ def set_default_port(schema: dict) -> dict: return schema +def insteon_address(value: str) -> str: + """Validate an Insteon address.""" + if not INSTEON_ADDR_REGEX.match(value): + raise vol.Invalid("Invalid Insteon Address") + return str(value).replace(".", "").lower() + + CONF_DEVICE_OVERRIDE_SCHEMA = vol.All( vol.Schema( { diff --git a/tests/components/insteon/mock_connection.py b/tests/components/insteon/mock_connection.py new file mode 100644 index 00000000000..00d2c1ec83a --- /dev/null +++ b/tests/components/insteon/mock_connection.py @@ -0,0 +1,11 @@ +"""Mock connections for Insteon.""" + + +async def mock_successful_connection(*args, **kwargs): + """Return a successful connection.""" + return True + + +async def mock_failed_connection(*args, **kwargs): + """Return a failed connection.""" + raise ConnectionError("Connection failed") diff --git a/tests/components/insteon/mock_devices.py b/tests/components/insteon/mock_devices.py index 7ffb0672161..e28e25bf41b 100644 --- a/tests/components/insteon/mock_devices.py +++ b/tests/components/insteon/mock_devices.py @@ -2,11 +2,14 @@ from unittest.mock import AsyncMock, MagicMock from pyinsteon.address import Address +from pyinsteon.constants import ALDBStatus, ResponseStatus from pyinsteon.device_types import ( - GeneralController_MiniRemote_4, + DimmableLightingControl_KeypadLinc_8, + GeneralController, Hub, SwitchedLightingControl_SwitchLinc, ) +from pyinsteon.managers.saved_devices_manager import dict_to_aldb_record class MockSwitchLinc(SwitchedLightingControl_SwitchLinc): @@ -32,7 +35,7 @@ class MockDevices: def __getitem__(self, address): """Return a a device from the device address.""" - return self._devices.get(address) + return self._devices.get(Address(address)) def __iter__(self): """Return an iterator of device addresses.""" @@ -53,13 +56,73 @@ class MockDevices: addr1 = Address("11.11.11") addr2 = Address("22.22.22") addr3 = Address("33.33.33") - self._devices[addr0] = Hub(addr0) - self._devices[addr1] = MockSwitchLinc(addr1, 0x02, 0x00) - self._devices[addr2] = GeneralController_MiniRemote_4(addr2, 0x00, 0x00) - self._devices[addr3] = SwitchedLightingControl_SwitchLinc(addr3, 0x02, 0x00) + self._devices[addr0] = Hub(addr0, 0x03, 0x00, 0x00, "Hub AA.AA.AA", "0") + self._devices[addr1] = MockSwitchLinc( + addr1, 0x02, 0x00, 0x00, "Device 11.11.11", "1" + ) + self._devices[addr2] = GeneralController( + addr2, 0x00, 0x00, 0x00, "Device 22.22.22", "2" + ) + self._devices[addr3] = DimmableLightingControl_KeypadLinc_8( + addr3, 0x02, 0x00, 0x00, "Device 33.33.33", "3" + ) + for device in [self._devices[addr] for addr in [addr1, addr2, addr3]]: device.async_read_config = AsyncMock() + device.aldb.async_write = AsyncMock() + device.aldb.async_load = AsyncMock() + device.async_add_default_links = AsyncMock() + device.async_read_op_flags = AsyncMock( + return_value=ResponseStatus.SUCCESS + ) + device.async_read_ext_properties = AsyncMock( + return_value=ResponseStatus.SUCCESS + ) + device.async_write_op_flags = AsyncMock( + return_value=ResponseStatus.SUCCESS + ) + device.async_write_ext_properties = AsyncMock( + return_value=ResponseStatus.SUCCESS + ) + for device in [self._devices[addr] for addr in [addr2, addr3]]: device.async_status = AsyncMock() self._devices[addr1].async_status = AsyncMock(side_effect=AttributeError) + self._devices[addr0].aldb.async_load = AsyncMock() + + self._devices[addr2].async_read_op_flags = AsyncMock( + return_value=ResponseStatus.FAILURE + ) + self._devices[addr2].async_read_ext_properties = AsyncMock( + return_value=ResponseStatus.FAILURE + ) + self._devices[addr2].async_write_op_flags = AsyncMock( + return_value=ResponseStatus.FAILURE + ) + self._devices[addr2].async_write_ext_properties = AsyncMock( + return_value=ResponseStatus.FAILURE + ) + self.modem = self._devices[addr0] + + def fill_aldb(self, address, records): + """Fill the All-Link Database for a device.""" + device = self._devices[Address(address)] + aldb_records = dict_to_aldb_record(records) + + device.aldb.load_saved_records(ALDBStatus.LOADED, aldb_records) + + def fill_properties(self, address, props_dict): + """Fill the operating flags and extended properties of a device.""" + device = self._devices[Address(address)] + operating_flags = props_dict.get("operating_flags", {}) + properties = props_dict.get("properties", {}) + + for flag in operating_flags: + value = operating_flags[flag] + if device.operating_flags.get(flag): + device.operating_flags[flag].load(value) + for flag in properties: + value = properties[flag] + if device.properties.get(flag): + device.properties[flag].load(value) diff --git a/tests/components/insteon/test_api_aldb.py b/tests/components/insteon/test_api_aldb.py new file mode 100644 index 00000000000..d360b34d7b9 --- /dev/null +++ b/tests/components/insteon/test_api_aldb.py @@ -0,0 +1,288 @@ +"""Test the Insteon All-Link Database APIs.""" + +import json +from unittest.mock import patch + +from pyinsteon import pub +from pyinsteon.address import Address +from pyinsteon.topics import ALDB_STATUS_CHANGED, DEVICE_LINK_CONTROLLER_CREATED +import pytest + +from homeassistant.components import insteon +from homeassistant.components.insteon.api import async_load_api +from homeassistant.components.insteon.api.aldb import ( + ALDB_RECORD, + DEVICE_ADDRESS, + ID, + TYPE, +) +from homeassistant.components.insteon.api.device import INSTEON_DEVICE_NOT_FOUND + +from .mock_devices import MockDevices + +from tests.common import load_fixture + + +@pytest.fixture(name="aldb_data", scope="session") +def aldb_data_fixture(): + """Load the controller state fixture data.""" + return json.loads(load_fixture("insteon/aldb_data.json")) + + +async def _setup(hass, hass_ws_client, aldb_data): + """Set up tests.""" + ws_client = await hass_ws_client(hass) + devices = MockDevices() + await devices.async_load() + async_load_api(hass) + devices.fill_aldb("33.33.33", aldb_data) + return ws_client, devices + + +def _compare_records(aldb_rec, dict_rec): + """Compare a record in the ALDB to the dictionary record.""" + assert aldb_rec.is_in_use == dict_rec["in_use"] + assert aldb_rec.is_controller == (dict_rec["is_controller"]) + assert not aldb_rec.is_high_water_mark + assert aldb_rec.group == dict_rec["group"] + assert aldb_rec.target == Address(dict_rec["target"]) + assert aldb_rec.data1 == dict_rec["data1"] + assert aldb_rec.data2 == dict_rec["data2"] + assert aldb_rec.data3 == dict_rec["data3"] + + +def _aldb_dict(mem_addr): + """Generate an ALDB record as a dictionary.""" + return { + "mem_addr": mem_addr, + "in_use": True, + "is_controller": True, + "highwater": False, + "group": 100, + "target": "111111", + "data1": 101, + "data2": 102, + "data3": 103, + "dirty": True, + } + + +async def test_get_aldb(hass, hass_ws_client, aldb_data): + """Test getting an Insteon device's All-Link Database.""" + ws_client, devices = await _setup(hass, hass_ws_client, aldb_data) + + with patch.object(insteon.api.aldb, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/aldb/get", DEVICE_ADDRESS: "33.33.33"} + ) + msg = await ws_client.receive_json() + result = msg["result"] + + assert len(result) == 5 + + +async def test_change_aldb_record(hass, hass_ws_client, aldb_data): + """Test changing an Insteon device's All-Link Database record.""" + ws_client, devices = await _setup(hass, hass_ws_client, aldb_data) + change_rec = _aldb_dict(4079) + + with patch.object(insteon.api.aldb, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/aldb/change", + DEVICE_ADDRESS: "33.33.33", + ALDB_RECORD: change_rec, + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert len(devices["33.33.33"].aldb.pending_changes) == 1 + rec = devices["33.33.33"].aldb.pending_changes[4079] + _compare_records(rec, change_rec) + + +async def test_create_aldb_record(hass, hass_ws_client, aldb_data): + """Test creating a new Insteon All-Link Database record.""" + ws_client, devices = await _setup(hass, hass_ws_client, aldb_data) + new_rec = _aldb_dict(4079) + + with patch.object(insteon.api.aldb, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/aldb/create", + DEVICE_ADDRESS: "33.33.33", + ALDB_RECORD: new_rec, + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert len(devices["33.33.33"].aldb.pending_changes) == 1 + rec = devices["33.33.33"].aldb.pending_changes[-1] + _compare_records(rec, new_rec) + + +async def test_write_aldb(hass, hass_ws_client, aldb_data): + """Test writing an Insteon device's All-Link Database.""" + ws_client, devices = await _setup(hass, hass_ws_client, aldb_data) + + with patch.object(insteon.api.aldb, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/aldb/write", + DEVICE_ADDRESS: "33.33.33", + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert devices["33.33.33"].aldb.async_write.call_count == 1 + assert devices["33.33.33"].aldb.async_load.call_count == 1 + assert devices.async_save.call_count == 1 + + +async def test_load_aldb(hass, hass_ws_client, aldb_data): + """Test loading an Insteon device's All-Link Database.""" + ws_client, devices = await _setup(hass, hass_ws_client, aldb_data) + + with patch.object(insteon.api.aldb, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/aldb/load", + DEVICE_ADDRESS: "AA.AA.AA", + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert devices["AA.AA.AA"].aldb.async_load.call_count == 1 + assert devices.async_save.call_count == 1 + + +async def test_reset_aldb(hass, hass_ws_client, aldb_data): + """Test resetting an Insteon device's All-Link Database.""" + ws_client, devices = await _setup(hass, hass_ws_client, aldb_data) + record = _aldb_dict(4079) + devices["33.33.33"].aldb.modify( + mem_addr=record["mem_addr"], + in_use=record["in_use"], + group=record["group"], + controller=record["is_controller"], + target=record["target"], + data1=record["data1"], + data2=record["data2"], + data3=record["data3"], + ) + + assert devices["33.33.33"].aldb.pending_changes + with patch.object(insteon.api.aldb, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/aldb/reset", + DEVICE_ADDRESS: "33.33.33", + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert not devices["33.33.33"].aldb.pending_changes + + +async def test_default_links(hass, hass_ws_client, aldb_data): + """Test getting an Insteon device's All-Link Database.""" + ws_client, devices = await _setup(hass, hass_ws_client, aldb_data) + + with patch.object(insteon.api.aldb, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/aldb/add_default_links", + DEVICE_ADDRESS: "33.33.33", + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert devices["33.33.33"].async_add_default_links.call_count == 1 + assert devices["33.33.33"].aldb.async_load.call_count == 1 + assert devices.async_save.call_count == 1 + + +async def test_notify_on_aldb_status(hass, hass_ws_client, aldb_data): + """Test getting an Insteon device's All-Link Database.""" + ws_client, devices = await _setup(hass, hass_ws_client, aldb_data) + + with patch.object(insteon.api.aldb, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/aldb/notify", + DEVICE_ADDRESS: "33.33.33", + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + + pub.sendMessage(f"333333.{ALDB_STATUS_CHANGED}") + msg = await ws_client.receive_json() + assert msg["event"]["type"] == "status_changed" + assert not msg["event"]["is_loading"] + + +async def test_notify_on_aldb_record_added(hass, hass_ws_client, aldb_data): + """Test getting an Insteon device's All-Link Database.""" + ws_client, devices = await _setup(hass, hass_ws_client, aldb_data) + + with patch.object(insteon.api.aldb, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/aldb/notify", + DEVICE_ADDRESS: "33.33.33", + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + + pub.sendMessage( + f"{DEVICE_LINK_CONTROLLER_CREATED}.333333", + controller=Address("11.11.11"), + responder=Address("33.33.33"), + group=100, + ) + msg = await ws_client.receive_json() + assert msg["event"]["type"] == "record_loaded" + + +async def test_bad_address(hass, hass_ws_client, aldb_data): + """Test for a bad Insteon address.""" + ws_client, _ = await _setup(hass, hass_ws_client, aldb_data) + record = _aldb_dict(0) + + ws_id = 0 + for call in ["get", "write", "load", "reset", "add_default_links", "notify"]: + ws_id += 1 + await ws_client.send_json( + { + ID: ws_id, + TYPE: f"insteon/aldb/{call}", + DEVICE_ADDRESS: "99.99.99", + } + ) + msg = await ws_client.receive_json() + assert not msg["success"] + assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND + + for call in ["change", "create"]: + ws_id += 1 + await ws_client.send_json( + { + ID: ws_id, + TYPE: f"insteon/aldb/{call}", + DEVICE_ADDRESS: "99.99.99", + ALDB_RECORD: record, + } + ) + msg = await ws_client.receive_json() + assert not msg["success"] + assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND diff --git a/tests/components/insteon/test_api_device.py b/tests/components/insteon/test_api_device.py new file mode 100644 index 00000000000..528d44cc691 --- /dev/null +++ b/tests/components/insteon/test_api_device.py @@ -0,0 +1,139 @@ +"""Test the device level APIs.""" +from unittest.mock import patch + +from homeassistant.components import insteon +from homeassistant.components.insteon.api import async_load_api +from homeassistant.components.insteon.api.device import ( + DEVICE_ID, + HA_DEVICE_NOT_FOUND, + ID, + INSTEON_DEVICE_NOT_FOUND, + TYPE, + async_device_name, +) +from homeassistant.components.insteon.const import DOMAIN +from homeassistant.helpers.device_registry import async_get_registry + +from .const import MOCK_USER_INPUT_PLM +from .mock_devices import MockDevices + +from tests.common import MockConfigEntry + + +async def _async_setup(hass, hass_ws_client): + """Set up for tests.""" + config_entry = MockConfigEntry( + domain=DOMAIN, + entry_id="abcde12345", + data=MOCK_USER_INPUT_PLM, + options={}, + ) + config_entry.add_to_hass(hass) + async_load_api(hass) + + ws_client = await hass_ws_client(hass) + devices = MockDevices() + await devices.async_load() + + dev_reg = await async_get_registry(hass) + # Create device registry entry for mock node + ha_device = dev_reg.async_get_or_create( + config_entry_id=config_entry.entry_id, + identifiers={(DOMAIN, "11.11.11")}, + name="Device 11.11.11", + ) + return ws_client, devices, ha_device, dev_reg + + +async def test_get_device_api(hass, hass_ws_client): + """Test getting an Insteon device.""" + + ws_client, devices, ha_device, _ = await _async_setup(hass, hass_ws_client) + with patch.object(insteon.api.device, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/device/get", DEVICE_ID: ha_device.id} + ) + msg = await ws_client.receive_json() + result = msg["result"] + + assert result["name"] == "Device 11.11.11" + assert result["address"] == "11.11.11" + + +async def test_no_ha_device(hass, hass_ws_client): + """Test response when no HA device exists.""" + + ws_client, devices, _, _ = await _async_setup(hass, hass_ws_client) + with patch.object(insteon.api.device, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/device/get", DEVICE_ID: "not_a_device"} + ) + msg = await ws_client.receive_json() + assert not msg.get("result") + assert msg.get("error") + assert msg["error"]["message"] == HA_DEVICE_NOT_FOUND + + +async def test_no_insteon_device(hass, hass_ws_client): + """Test response when no Insteon device exists.""" + config_entry = MockConfigEntry( + domain=DOMAIN, + entry_id="abcde12345", + data=MOCK_USER_INPUT_PLM, + options={}, + ) + config_entry.add_to_hass(hass) + async_load_api(hass) + + ws_client = await hass_ws_client(hass) + devices = MockDevices() + await devices.async_load() + + dev_reg = await async_get_registry(hass) + # Create device registry entry for a Insteon device not in the Insteon devices list + ha_device_1 = dev_reg.async_get_or_create( + config_entry_id=config_entry.entry_id, + identifiers={(DOMAIN, "AA.BB.CC")}, + name="HA Device Only", + ) + # Create device registry entry for a non-Insteon device + ha_device_2 = dev_reg.async_get_or_create( + config_entry_id=config_entry.entry_id, + identifiers={("other_domain", "no address")}, + name="HA Device Only", + ) + with patch.object(insteon.api.device, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/device/get", DEVICE_ID: ha_device_1.id} + ) + msg = await ws_client.receive_json() + assert not msg.get("result") + assert msg.get("error") + assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND + + await ws_client.send_json( + {ID: 3, TYPE: "insteon/device/get", DEVICE_ID: ha_device_2.id} + ) + msg = await ws_client.receive_json() + assert not msg.get("result") + assert msg.get("error") + assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND + + +async def test_get_ha_device_name(hass, hass_ws_client): + """Test getting the HA device name from an Insteon address.""" + + _, devices, _, device_reg = await _async_setup(hass, hass_ws_client) + + with patch.object(insteon.api.device, "devices", devices): + # Test a real HA and Insteon device + name = await async_device_name(device_reg, "11.11.11") + assert name == "Device 11.11.11" + + # Test no HA device but a real Insteon device + name = await async_device_name(device_reg, "22.22.22") + assert name == "Device 22.22.22 (2)" + + # Test no HA or Insteon device + name = await async_device_name(device_reg, "BB.BB.BB") + assert name == "" diff --git a/tests/components/insteon/test_api_properties.py b/tests/components/insteon/test_api_properties.py new file mode 100644 index 00000000000..9b628f4443a --- /dev/null +++ b/tests/components/insteon/test_api_properties.py @@ -0,0 +1,425 @@ +"""Test the Insteon properties APIs.""" + +import json +from unittest.mock import patch + +import pytest + +from homeassistant.components import insteon +from homeassistant.components.insteon.api import async_load_api +from homeassistant.components.insteon.api.device import INSTEON_DEVICE_NOT_FOUND +from homeassistant.components.insteon.api.properties import ( + DEVICE_ADDRESS, + ID, + NON_TOGGLE_MASK, + NON_TOGGLE_OFF_MODE, + NON_TOGGLE_ON_MODE, + NON_TOGGLE_ON_OFF_MASK, + PROPERTY_NAME, + PROPERTY_VALUE, + RADIO_BUTTON_GROUP_PROP, + TOGGLE_MODES, + TOGGLE_ON_OFF_MODE, + TOGGLE_PROP, + TYPE, + _get_radio_button_properties, + _get_toggle_properties, +) + +from .mock_devices import MockDevices + +from tests.common import load_fixture + + +@pytest.fixture(name="properties_data", scope="session") +def aldb_data_fixture(): + """Load the controller state fixture data.""" + return json.loads(load_fixture("insteon/kpl_properties.json")) + + +async def _setup(hass, hass_ws_client, properties_data): + """Set up tests.""" + ws_client = await hass_ws_client(hass) + devices = MockDevices() + await devices.async_load() + devices.fill_properties("33.33.33", properties_data) + async_load_api(hass) + return ws_client, devices + + +async def test_get_properties(hass, hass_ws_client, properties_data): + """Test getting an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/properties/get", DEVICE_ADDRESS: "33.33.33"} + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert len(msg["result"]["properties"]) == 54 + + +async def test_change_operating_flag(hass, hass_ws_client, properties_data): + """Test changing an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: "led_off", + PROPERTY_VALUE: True, + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert devices["33.33.33"].operating_flags["led_off"].is_dirty + + +async def test_change_property(hass, hass_ws_client, properties_data): + """Test changing an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: "on_mask", + PROPERTY_VALUE: 100, + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert devices["33.33.33"].properties["on_mask"].new_value == 100 + assert devices["33.33.33"].properties["on_mask"].is_dirty + + +async def test_change_ramp_rate_property(hass, hass_ws_client, properties_data): + """Test changing an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: "ramp_rate", + PROPERTY_VALUE: 4.5, + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert devices["33.33.33"].properties["ramp_rate"].new_value == 0x1A + assert devices["33.33.33"].properties["ramp_rate"].is_dirty + + +async def test_change_radio_button_group(hass, hass_ws_client, properties_data): + """Test changing an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + rb_props, schema = _get_radio_button_properties(devices["33.33.33"]) + + # Make sure the baseline is correct + assert rb_props[0]["name"] == f"{RADIO_BUTTON_GROUP_PROP}0" + assert rb_props[0]["value"] == [4, 5] + assert rb_props[1]["value"] == [7, 8] + assert rb_props[2]["value"] == [] + assert schema[f"{RADIO_BUTTON_GROUP_PROP}0"]["options"].get(1) + assert schema[f"{RADIO_BUTTON_GROUP_PROP}1"]["options"].get(1) + assert devices["33.33.33"].properties["on_mask"].value == 0 + assert devices["33.33.33"].properties["off_mask"].value == 0 + assert not devices["33.33.33"].properties["on_mask"].is_dirty + assert not devices["33.33.33"].properties["off_mask"].is_dirty + + # Add button 1 to the group + rb_props[0]["value"].append(1) + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: f"{RADIO_BUTTON_GROUP_PROP}0", + PROPERTY_VALUE: rb_props[0]["value"], + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + + new_rb_props, _ = _get_radio_button_properties(devices["33.33.33"]) + assert 1 in new_rb_props[0]["value"] + assert 4 in new_rb_props[0]["value"] + assert 5 in new_rb_props[0]["value"] + assert schema[f"{RADIO_BUTTON_GROUP_PROP}0"]["options"].get(1) + assert schema[f"{RADIO_BUTTON_GROUP_PROP}1"]["options"].get(1) + + assert devices["33.33.33"].properties["on_mask"].new_value == 0x18 + assert devices["33.33.33"].properties["off_mask"].new_value == 0x18 + assert devices["33.33.33"].properties["on_mask"].is_dirty + assert devices["33.33.33"].properties["off_mask"].is_dirty + + # Remove button 5 + rb_props[0]["value"].remove(5) + await ws_client.send_json( + { + ID: 3, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: f"{RADIO_BUTTON_GROUP_PROP}0", + PROPERTY_VALUE: rb_props[0]["value"], + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + + new_rb_props, _ = _get_radio_button_properties(devices["33.33.33"]) + assert 1 in new_rb_props[0]["value"] + assert 4 in new_rb_props[0]["value"] + assert 5 not in new_rb_props[0]["value"] + assert schema[f"{RADIO_BUTTON_GROUP_PROP}0"]["options"].get(1) + assert schema[f"{RADIO_BUTTON_GROUP_PROP}1"]["options"].get(1) + + assert devices["33.33.33"].properties["on_mask"].new_value == 0x08 + assert devices["33.33.33"].properties["off_mask"].new_value == 0x08 + assert devices["33.33.33"].properties["on_mask"].is_dirty + assert devices["33.33.33"].properties["off_mask"].is_dirty + + # Remove button group 1 + rb_props[1]["value"] = [] + await ws_client.send_json( + { + ID: 5, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: f"{RADIO_BUTTON_GROUP_PROP}1", + PROPERTY_VALUE: rb_props[1]["value"], + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + + new_rb_props, _ = _get_radio_button_properties(devices["33.33.33"]) + assert len(new_rb_props) == 2 + assert new_rb_props[0]["value"] == [1, 4] + assert new_rb_props[1]["value"] == [] + + +async def test_create_radio_button_group(hass, hass_ws_client, properties_data): + """Test changing an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + rb_props, _ = _get_radio_button_properties(devices["33.33.33"]) + + # Make sure the baseline is correct + assert len(rb_props) == 3 + print(rb_props) + + rb_props[0]["value"].append("1") + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: f"{RADIO_BUTTON_GROUP_PROP}2", + PROPERTY_VALUE: ["1", "3"], + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + + new_rb_props, new_schema = _get_radio_button_properties(devices["33.33.33"]) + assert len(new_rb_props) == 4 + assert 1 in new_rb_props[0]["value"] + assert new_schema[f"{RADIO_BUTTON_GROUP_PROP}0"]["options"].get(1) + assert not new_schema[f"{RADIO_BUTTON_GROUP_PROP}1"]["options"].get(1) + + assert devices["33.33.33"].properties["on_mask"].new_value == 4 + assert devices["33.33.33"].properties["off_mask"].new_value == 4 + assert devices["33.33.33"].properties["on_mask"].is_dirty + assert devices["33.33.33"].properties["off_mask"].is_dirty + + +async def test_change_toggle_property(hass, hass_ws_client, properties_data): + """Update a button's toggle mode.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + device = devices["33.33.33"] + toggle_props, _ = _get_toggle_properties(devices["33.33.33"]) + + # Make sure the baseline is correct + assert toggle_props[0]["name"] == f"{TOGGLE_PROP}{device.groups[1].name}" + assert toggle_props[0]["value"] == TOGGLE_MODES[TOGGLE_ON_OFF_MODE] + assert toggle_props[1]["value"] == TOGGLE_MODES[NON_TOGGLE_ON_MODE] + assert device.properties[NON_TOGGLE_MASK].value == 2 + assert device.properties[NON_TOGGLE_ON_OFF_MASK].value == 2 + assert not device.properties[NON_TOGGLE_MASK].is_dirty + assert not device.properties[NON_TOGGLE_ON_OFF_MASK].is_dirty + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + { + ID: 2, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: toggle_props[0]["name"], + PROPERTY_VALUE: 1, + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + + new_toggle_props, _ = _get_toggle_properties(devices["33.33.33"]) + assert new_toggle_props[0]["value"] == TOGGLE_MODES[NON_TOGGLE_ON_MODE] + assert device.properties[NON_TOGGLE_MASK].new_value == 3 + assert device.properties[NON_TOGGLE_ON_OFF_MASK].new_value == 3 + assert device.properties[NON_TOGGLE_MASK].is_dirty + assert device.properties[NON_TOGGLE_ON_OFF_MASK].is_dirty + + await ws_client.send_json( + { + ID: 3, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: toggle_props[0]["name"], + PROPERTY_VALUE: 2, + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + + new_toggle_props, _ = _get_toggle_properties(devices["33.33.33"]) + assert new_toggle_props[0]["value"] == TOGGLE_MODES[NON_TOGGLE_OFF_MODE] + assert device.properties[NON_TOGGLE_MASK].new_value == 3 + assert device.properties[NON_TOGGLE_ON_OFF_MASK].new_value is None + assert device.properties[NON_TOGGLE_MASK].is_dirty + assert not device.properties[NON_TOGGLE_ON_OFF_MASK].is_dirty + + await ws_client.send_json( + { + ID: 4, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "33.33.33", + PROPERTY_NAME: toggle_props[1]["name"], + PROPERTY_VALUE: 0, + } + ) + msg = await ws_client.receive_json() + assert msg["success"] + + new_toggle_props, _ = _get_toggle_properties(devices["33.33.33"]) + assert new_toggle_props[1]["value"] == TOGGLE_MODES[TOGGLE_ON_OFF_MODE] + assert device.properties[NON_TOGGLE_MASK].new_value == 1 + assert device.properties[NON_TOGGLE_ON_OFF_MASK].new_value == 0 + assert device.properties[NON_TOGGLE_MASK].is_dirty + assert device.properties[NON_TOGGLE_ON_OFF_MASK].is_dirty + + +async def test_write_properties(hass, hass_ws_client, properties_data): + """Test getting an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/properties/write", DEVICE_ADDRESS: "33.33.33"} + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert devices["33.33.33"].async_write_op_flags.call_count == 1 + assert devices["33.33.33"].async_write_ext_properties.call_count == 1 + + +async def test_write_properties_failure(hass, hass_ws_client, properties_data): + """Test getting an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/properties/write", DEVICE_ADDRESS: "22.22.22"} + ) + msg = await ws_client.receive_json() + assert not msg["success"] + assert msg["error"]["code"] == "write_failed" + + +async def test_load_properties(hass, hass_ws_client, properties_data): + """Test getting an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/properties/load", DEVICE_ADDRESS: "33.33.33"} + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert devices["33.33.33"].async_read_op_flags.call_count == 1 + assert devices["33.33.33"].async_read_ext_properties.call_count == 1 + + +async def test_load_properties_failure(hass, hass_ws_client, properties_data): + """Test getting an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/properties/load", DEVICE_ADDRESS: "22.22.22"} + ) + msg = await ws_client.receive_json() + assert not msg["success"] + assert msg["error"]["code"] == "load_failed" + + +async def test_reset_properties(hass, hass_ws_client, properties_data): + """Test getting an Insteon device's properties.""" + ws_client, devices = await _setup(hass, hass_ws_client, properties_data) + + device = devices["33.33.33"] + device.operating_flags["led_off"].new_value = True + device.properties["on_mask"].new_value = 100 + assert device.operating_flags["led_off"].is_dirty + assert device.properties["on_mask"].is_dirty + with patch.object(insteon.api.properties, "devices", devices): + await ws_client.send_json( + {ID: 2, TYPE: "insteon/properties/reset", DEVICE_ADDRESS: "33.33.33"} + ) + msg = await ws_client.receive_json() + assert msg["success"] + assert not device.operating_flags["led_off"].is_dirty + assert not device.properties["on_mask"].is_dirty + + +async def test_bad_address(hass, hass_ws_client, properties_data): + """Test for a bad Insteon address.""" + ws_client, _ = await _setup(hass, hass_ws_client, properties_data) + + ws_id = 0 + for call in ["get", "write", "load", "reset"]: + ws_id += 1 + await ws_client.send_json( + { + ID: ws_id, + TYPE: f"insteon/properties/{call}", + DEVICE_ADDRESS: "99.99.99", + } + ) + msg = await ws_client.receive_json() + assert not msg["success"] + assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND + + ws_id += 1 + await ws_client.send_json( + { + ID: ws_id, + TYPE: "insteon/properties/change", + DEVICE_ADDRESS: "99.99.99", + PROPERTY_NAME: "led_off", + PROPERTY_VALUE: True, + } + ) + msg = await ws_client.receive_json() + assert not msg["success"] + assert msg["error"]["message"] == INSTEON_DEVICE_NOT_FOUND diff --git a/tests/fixtures/insteon/aldb_data.json b/tests/fixtures/insteon/aldb_data.json new file mode 100644 index 00000000000..2cab1dd5050 --- /dev/null +++ b/tests/fixtures/insteon/aldb_data.json @@ -0,0 +1,67 @@ +{ + "4095": { + "memory": 4095, + "in_use": true, + "controller": false, + "high_water_mark": false, + "bit5": true, + "bit4": false, + "group": 0, + "target": "aaaaaa", + "data1": 0, + "data2": 0, + "data3": 0 + }, + "4087": { + "memory": 4087, + "in_use": true, + "controller": true, + "high_water_mark": false, + "bit5": true, + "bit4": false, + "group": 1, + "target": "aaaaaa", + "data1": 0, + "data2": 0, + "data3": 0 + }, + "4079": { + "memory": 4079, + "in_use": true, + "controller": false, + "high_water_mark": false, + "bit5": true, + "bit4": false, + "group": 0, + "target": "111111", + "data1": 0, + "data2": 0, + "data3": 0 + }, + "4071": { + "memory": 4071, + "in_use": true, + "controller": true, + "high_water_mark": false, + "bit5": true, + "bit4": false, + "group": 2, + "target": "222222", + "data1": 0, + "data2": 0, + "data3": 0 + }, + "4063": { + "memory": 4063, + "in_use": true, + "controller": false, + "high_water_mark": false, + "bit5": true, + "bit4": false, + "group": 3, + "target": "333333", + "data1": 0, + "data2": 0, + "data3": 0 + } +} \ No newline at end of file diff --git a/tests/fixtures/insteon/kpl_properties.json b/tests/fixtures/insteon/kpl_properties.json new file mode 100644 index 00000000000..1115428a073 --- /dev/null +++ b/tests/fixtures/insteon/kpl_properties.json @@ -0,0 +1,66 @@ +{ + "operating_flags": { + "program_lock_on": false, + "blink_on_tx_on": false, + "resume_dim_on": false, + "led_on": false, + "key_beep_on": false, + "rf_disable_on": false, + "powerline_disable_on": false, + "blink_on_error_on": false + }, + "properties": { + "led_dimming": 10, + "non_toggle_mask": 2, + "non_toggle_on_off_mask": 2, + "trigger_group_mask": 0, + "on_mask": 0, + "off_mask": 0, + "x10_house": 32, + "x10_unit": 32, + "ramp_rate": 28, + "on_level": 255, + "on_mask_2": 0, + "off_mask_2": 0, + "x10_house_2": 32, + "x10_unit_2": 32, + "ramp_rate_2": 0, + "on_level_2": 0, + "on_mask_3": 0, + "off_mask_3": 0, + "x10_house_3": 32, + "x10_unit_3": 32, + "ramp_rate_3": 0, + "on_level_3": 0, + "on_mask_4": 16, + "off_mask_4": 16, + "x10_house_4": 32, + "x10_unit_4": 32, + "ramp_rate_4": 0, + "on_level_4": 0, + "on_mask_5": 0, + "off_mask_5": 0, + "x10_house_5": 32, + "x10_unit_5": 32, + "ramp_rate_5": 0, + "on_level_5": 0, + "on_mask_6": 0, + "off_mask_6": 0, + "x10_house_6": 32, + "x10_unit_6": 32, + "ramp_rate_6": 0, + "on_level_6": 0, + "on_mask_7": 128, + "off_mask_7": 128, + "x10_house_7": 32, + "x10_unit_7": 32, + "ramp_rate_7": 0, + "on_level_7": 0, + "on_mask_8": 64, + "off_mask_8": 64, + "x10_house_8": 32, + "x10_unit_8": 2, + "ramp_rate_8": 98, + "on_level_8": 74 + } +} \ No newline at end of file