"""Lovelace UI.""" import logging import uuid from functools import wraps from typing import Dict, List, Union import voluptuous as vol import homeassistant.util.ruamel_yaml as yaml from homeassistant.components import websocket_api from homeassistant.exceptions import HomeAssistantError _LOGGER = logging.getLogger(__name__) DOMAIN = 'lovelace' LOVELACE_CONFIG_FILE = 'ui-lovelace.yaml' JSON_TYPE = Union[List, Dict, str] # pylint: disable=invalid-name FORMAT_YAML = 'yaml' FORMAT_JSON = 'json' OLD_WS_TYPE_GET_LOVELACE_UI = 'frontend/lovelace_config' WS_TYPE_GET_LOVELACE_UI = 'lovelace/config' WS_TYPE_MIGRATE_CONFIG = 'lovelace/config/migrate' WS_TYPE_GET_CARD = 'lovelace/config/card/get' WS_TYPE_UPDATE_CARD = 'lovelace/config/card/update' WS_TYPE_ADD_CARD = 'lovelace/config/card/add' WS_TYPE_MOVE_CARD = 'lovelace/config/card/move' WS_TYPE_DELETE_CARD = 'lovelace/config/card/delete' SCHEMA_GET_LOVELACE_UI = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({ vol.Required('type'): vol.Any(WS_TYPE_GET_LOVELACE_UI, OLD_WS_TYPE_GET_LOVELACE_UI), }) SCHEMA_MIGRATE_CONFIG = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({ vol.Required('type'): WS_TYPE_MIGRATE_CONFIG, }) SCHEMA_GET_CARD = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({ vol.Required('type'): WS_TYPE_GET_CARD, vol.Required('card_id'): str, vol.Optional('format', default=FORMAT_YAML): vol.Any(FORMAT_JSON, FORMAT_YAML), }) SCHEMA_UPDATE_CARD = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({ vol.Required('type'): WS_TYPE_UPDATE_CARD, vol.Required('card_id'): str, vol.Required('card_config'): vol.Any(str, Dict), vol.Optional('format', default=FORMAT_YAML): vol.Any(FORMAT_JSON, FORMAT_YAML), }) SCHEMA_ADD_CARD = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({ vol.Required('type'): WS_TYPE_ADD_CARD, vol.Required('view_id'): str, vol.Required('card_config'): vol.Any(str, Dict), vol.Optional('position'): int, vol.Optional('format', default=FORMAT_YAML): vol.Any(FORMAT_JSON, FORMAT_YAML), }) SCHEMA_MOVE_CARD = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({ vol.Required('type'): WS_TYPE_MOVE_CARD, vol.Required('card_id'): str, vol.Optional('new_position'): int, vol.Optional('new_view_id'): str, }) SCHEMA_DELETE_CARD = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({ vol.Required('type'): WS_TYPE_DELETE_CARD, vol.Required('card_id'): str, }) class CardNotFoundError(HomeAssistantError): """Card not found in data.""" class ViewNotFoundError(HomeAssistantError): """View not found in data.""" class DuplicateIdError(HomeAssistantError): """Duplicate ID's.""" def load_config(fname: str) -> JSON_TYPE: """Load a YAML file.""" return yaml.load_yaml(fname, False) def migrate_config(fname: str) -> None: """Add id to views and cards if not present and check duplicates.""" config = yaml.load_yaml(fname, True) updated = False seen_card_ids = set() seen_view_ids = set() index = 0 for view in config.get('views', []): view_id = str(view.get('id', '')) if not view_id: updated = True view.insert(0, 'id', index, comment="Automatically created id") else: if view_id in seen_view_ids: raise DuplicateIdError( 'ID `{}` has multiple occurances in views'.format(view_id)) seen_view_ids.add(view_id) for card in view.get('cards', []): card_id = str(card.get('id', '')) if not card_id: updated = True card.insert(0, 'id', uuid.uuid4().hex, comment="Automatically created id") else: if card_id in seen_card_ids: raise DuplicateIdError( 'ID `{}` has multiple occurances in cards' .format(card_id)) seen_card_ids.add(card_id) index += 1 if updated: yaml.save_yaml(fname, config) def get_card(fname: str, card_id: str, data_format: str = FORMAT_YAML)\ -> JSON_TYPE: """Load a specific card config for id.""" round_trip = data_format == FORMAT_YAML config = yaml.load_yaml(fname, round_trip) for view in config.get('views', []): for card in view.get('cards', []): if str(card.get('id', '')) != card_id: continue if data_format == FORMAT_YAML: return yaml.object_to_yaml(card) return card raise CardNotFoundError( "Card with ID: {} was not found in {}.".format(card_id, fname)) def update_card(fname: str, card_id: str, card_config: str, data_format: str = FORMAT_YAML) -> None: """Save a specific card config for id.""" config = yaml.load_yaml(fname, True) for view in config.get('views', []): for card in view.get('cards', []): if str(card.get('id', '')) != card_id: continue if data_format == FORMAT_YAML: card_config = yaml.yaml_to_object(card_config) card.update(card_config) yaml.save_yaml(fname, config) return raise CardNotFoundError( "Card with ID: {} was not found in {}.".format(card_id, fname)) def add_card(fname: str, view_id: str, card_config: str, position: int = None, data_format: str = FORMAT_YAML) -> None: """Add a card to a view.""" config = yaml.load_yaml(fname, True) for view in config.get('views', []): if str(view.get('id', '')) != view_id: continue cards = view.get('cards', []) if data_format == FORMAT_YAML: card_config = yaml.yaml_to_object(card_config) if position is None: cards.append(card_config) else: cards.insert(position, card_config) yaml.save_yaml(fname, config) return raise ViewNotFoundError( "View with ID: {} was not found in {}.".format(view_id, fname)) def move_card(fname: str, card_id: str, position: int = None) -> None: """Move a card to a different position.""" if position is None: raise HomeAssistantError('Position is required if view is not\ specified.') config = yaml.load_yaml(fname, True) for view in config.get('views', []): for card in view.get('cards', []): if str(card.get('id', '')) != card_id: continue cards = view.get('cards') cards.insert(position, cards.pop(cards.index(card))) yaml.save_yaml(fname, config) return raise CardNotFoundError( "Card with ID: {} was not found in {}.".format(card_id, fname)) def move_card_view(fname: str, card_id: str, view_id: str, position: int = None) -> None: """Move a card to a different view.""" config = yaml.load_yaml(fname, True) for view in config.get('views', []): if str(view.get('id', '')) == view_id: destination = view.get('cards') for card in view.get('cards'): if str(card.get('id', '')) != card_id: continue origin = view.get('cards') card_to_move = card if 'destination' not in locals(): raise ViewNotFoundError( "View with ID: {} was not found in {}.".format(view_id, fname)) if 'card_to_move' not in locals(): raise CardNotFoundError( "Card with ID: {} was not found in {}.".format(card_id, fname)) origin.pop(origin.index(card_to_move)) if position is None: destination.append(card_to_move) else: destination.insert(position, card_to_move) yaml.save_yaml(fname, config) def delete_card(fname: str, card_id: str, position: int = None) -> None: """Delete a card from view.""" config = yaml.load_yaml(fname, True) for view in config.get('views', []): for card in view.get('cards', []): if str(card.get('id', '')) != card_id: continue cards = view.get('cards') cards.pop(cards.index(card)) yaml.save_yaml(fname, config) return raise CardNotFoundError( "Card with ID: {} was not found in {}.".format(card_id, fname)) async def async_setup(hass, config): """Set up the Lovelace commands.""" # Backwards compat. Added in 0.80. Remove after 0.85 hass.components.websocket_api.async_register_command( OLD_WS_TYPE_GET_LOVELACE_UI, websocket_lovelace_config, SCHEMA_GET_LOVELACE_UI) hass.components.websocket_api.async_register_command( WS_TYPE_MIGRATE_CONFIG, websocket_lovelace_migrate_config, SCHEMA_MIGRATE_CONFIG) hass.components.websocket_api.async_register_command( WS_TYPE_GET_LOVELACE_UI, websocket_lovelace_config, SCHEMA_GET_LOVELACE_UI) hass.components.websocket_api.async_register_command( WS_TYPE_GET_CARD, websocket_lovelace_get_card, SCHEMA_GET_CARD) hass.components.websocket_api.async_register_command( WS_TYPE_UPDATE_CARD, websocket_lovelace_update_card, SCHEMA_UPDATE_CARD) hass.components.websocket_api.async_register_command( WS_TYPE_ADD_CARD, websocket_lovelace_add_card, SCHEMA_ADD_CARD) hass.components.websocket_api.async_register_command( WS_TYPE_MOVE_CARD, websocket_lovelace_move_card, SCHEMA_MOVE_CARD) hass.components.websocket_api.async_register_command( WS_TYPE_DELETE_CARD, websocket_lovelace_delete_card, SCHEMA_DELETE_CARD) return True def handle_yaml_errors(func): """Handle error with websocket calls.""" @wraps(func) async def send_with_error_handling(hass, connection, msg): error = None try: result = await func(hass, connection, msg) message = websocket_api.result_message( msg['id'], result ) except FileNotFoundError: error = ('file_not_found', 'Could not find ui-lovelace.yaml in your config dir.') except yaml.UnsupportedYamlError as err: error = 'unsupported_error', str(err) except yaml.WriteError as err: error = 'write_error', str(err) except CardNotFoundError as err: error = 'card_not_found', str(err) except ViewNotFoundError as err: error = 'view_not_found', str(err) except HomeAssistantError as err: error = 'error', str(err) if error is not None: message = websocket_api.error_message(msg['id'], *error) connection.send_message(message) return send_with_error_handling @websocket_api.async_response @handle_yaml_errors async def websocket_lovelace_config(hass, connection, msg): """Send lovelace UI config over websocket config.""" return await hass.async_add_executor_job( load_config, hass.config.path(LOVELACE_CONFIG_FILE)) @websocket_api.async_response @handle_yaml_errors async def websocket_lovelace_migrate_config(hass, connection, msg): """Migrate lovelace UI config.""" return await hass.async_add_executor_job( migrate_config, hass.config.path(LOVELACE_CONFIG_FILE)) @websocket_api.async_response @handle_yaml_errors async def websocket_lovelace_get_card(hass, connection, msg): """Send lovelace card config over websocket config.""" return await hass.async_add_executor_job( get_card, hass.config.path(LOVELACE_CONFIG_FILE), msg['card_id'], msg.get('format', FORMAT_YAML)) @websocket_api.async_response @handle_yaml_errors async def websocket_lovelace_update_card(hass, connection, msg): """Receive lovelace card config over websocket and save.""" return await hass.async_add_executor_job( update_card, hass.config.path(LOVELACE_CONFIG_FILE), msg['card_id'], msg['card_config'], msg.get('format', FORMAT_YAML)) @websocket_api.async_response @handle_yaml_errors async def websocket_lovelace_add_card(hass, connection, msg): """Add new card to view over websocket and save.""" return await hass.async_add_executor_job( add_card, hass.config.path(LOVELACE_CONFIG_FILE), msg['view_id'], msg['card_config'], msg.get('position'), msg.get('format', FORMAT_YAML)) @websocket_api.async_response @handle_yaml_errors async def websocket_lovelace_move_card(hass, connection, msg): """Move card to different position over websocket and save.""" if 'new_view_id' in msg: return await hass.async_add_executor_job( move_card_view, hass.config.path(LOVELACE_CONFIG_FILE), msg['card_id'], msg['new_view_id'], msg.get('new_position')) return await hass.async_add_executor_job( move_card, hass.config.path(LOVELACE_CONFIG_FILE), msg['card_id'], msg.get('new_position')) @websocket_api.async_response @handle_yaml_errors async def websocket_lovelace_delete_card(hass, connection, msg): """Delete card from lovelace over websocket and save.""" return await hass.async_add_executor_job( delete_card, hass.config.path(LOVELACE_CONFIG_FILE), msg['card_id'])