"""Hyperion config flow."""
from __future__ import annotations

import asyncio
import logging
from typing import Any, Dict, Optional
from urllib.parse import urlparse

from hyperion import client, const
import voluptuous as vol

from homeassistant.components.ssdp import ATTR_SSDP_LOCATION, ATTR_UPNP_SERIAL
from homeassistant.config_entries import (
    CONN_CLASS_LOCAL_PUSH,
    SOURCE_REAUTH,
    ConfigEntry,
    ConfigFlow,
    OptionsFlow,
)
from homeassistant.const import (
    CONF_BASE,
    CONF_HOST,
    CONF_ID,
    CONF_PORT,
    CONF_SOURCE,
    CONF_TOKEN,
)
from homeassistant.core import callback
from homeassistant.helpers.typing import ConfigType

from . import create_hyperion_client

# pylint: disable=unused-import
from .const import (
    CONF_AUTH_ID,
    CONF_CREATE_TOKEN,
    CONF_PRIORITY,
    DEFAULT_ORIGIN,
    DEFAULT_PRIORITY,
    DOMAIN,
)

_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.DEBUG)

#  +------------------+ +------------------+ +--------------------+ +--------------------+
#  |Step: SSDP        | |Step: user        | |Step: import        | |Step: reauth        |
#  |                  | |                  | |                    | |                    |
#  |Input: <discovery>| |Input: <host/port>| |Input: <import data>| |Input: <entry_data> |
#  +------------------+ +------------------+ +--------------------+ +--------------------+
#           v                   v                       v                    v
#           +-------------------+-----------------------+--------------------+
# Auth not  |         Auth      |
# required? |         required? |
#           |                   v
#           |                +------------+
#           |                |Step: auth  |
#           |                |            |
#           |                |Input: token|
#           |                +------------+
#           |    Static         |
#           v    token          |
#            <------------------+
#           |                   |
#           |                   | New token
#           |                   v
#           |            +------------------+
#           |            |Step: create_token|
#           |            +------------------+
#           |                   |
#           |                   v
#           |            +---------------------------+   +--------------------------------+
#           |            |Step: create_token_external|-->|Step: create_token_external_fail|
#           |            +---------------------------+   +--------------------------------+
#           |                   |
#           |                   v
#           |            +-----------------------------------+
#           |            |Step: create_token_external_success|
#           |            +-----------------------------------+
#           |                   |
#           v<------------------+
#           |
#           v
#     +-------------+  Confirm not required?
#     |Step: Confirm|---------------------->+
#     +-------------+                       |
#           |                               |
#           v SSDP: Explicit confirm        |
#           +------------------------------>+
#                                           |
#                                           v
#                                 +----------------+
#                                 | Create/Update! |
#                                 +----------------+

# A note on choice of discovery mechanisms: Hyperion supports both Zeroconf and SSDP out
# of the box. This config flow needs two port numbers from the Hyperion instance, the
# JSON port (for the API) and the UI port (for the user to approve dynamically created
# auth tokens). With Zeroconf the port numbers for both are in different Zeroconf
# entries, and as Home Assistant only passes a single entry into the config flow, we can
# only conveniently 'see' one port or the other (which means we need to guess one port
# number). With SSDP, we get the combined block including both port numbers, so SSDP is
# the favored discovery implementation.


class HyperionConfigFlow(ConfigFlow, domain=DOMAIN):
    """Handle a Hyperion config flow."""

    VERSION = 1
    CONNECTION_CLASS = CONN_CLASS_LOCAL_PUSH

    def __init__(self) -> None:
        """Instantiate config flow."""
        self._data: Dict[str, Any] = {}
        self._request_token_task: Optional[asyncio.Task] = None
        self._auth_id: Optional[str] = None
        self._require_confirm: bool = False
        self._port_ui: int = const.DEFAULT_PORT_UI

    def _create_client(self, raw_connection: bool = False) -> client.HyperionClient:
        """Create and connect a client instance."""
        return create_hyperion_client(
            self._data[CONF_HOST],
            self._data[CONF_PORT],
            token=self._data.get(CONF_TOKEN),
            raw_connection=raw_connection,
        )

    async def _advance_to_auth_step_if_necessary(
        self, hyperion_client: client.HyperionClient
    ) -> Dict[str, Any]:
        """Determine if auth is required."""
        auth_resp = await hyperion_client.async_is_auth_required()

        # Could not determine if auth is required.
        if not auth_resp or not client.ResponseOK(auth_resp):
            return self.async_abort(reason="auth_required_error")
        auth_required = auth_resp.get(const.KEY_INFO, {}).get(const.KEY_REQUIRED, False)
        if auth_required:
            return await self.async_step_auth()
        return await self.async_step_confirm()

    async def async_step_reauth(
        self,
        config_data: ConfigType,
    ) -> Dict[str, Any]:
        """Handle a reauthentication flow."""
        self._data = dict(config_data)
        async with self._create_client(raw_connection=True) as hyperion_client:
            if not hyperion_client:
                return self.async_abort(reason="cannot_connect")
            return await self._advance_to_auth_step_if_necessary(hyperion_client)

    async def async_step_ssdp(  # type: ignore[override]
        self, discovery_info: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Handle a flow initiated by SSDP."""
        # Sample data provided by SSDP: {
        #   'ssdp_location': 'http://192.168.0.1:8090/description.xml',
        #   'ssdp_st': 'upnp:rootdevice',
        #   'deviceType': 'urn:schemas-upnp-org:device:Basic:1',
        #   'friendlyName': 'Hyperion (192.168.0.1)',
        #   'manufacturer': 'Hyperion Open Source Ambient Lighting',
        #   'manufacturerURL': 'https://www.hyperion-project.org',
        #   'modelDescription': 'Hyperion Open Source Ambient Light',
        #   'modelName': 'Hyperion',
        #   'modelNumber': '2.0.0-alpha.8',
        #   'modelURL': 'https://www.hyperion-project.org',
        #   'serialNumber': 'f9aab089-f85a-55cf-b7c1-222a72faebe9',
        #   'UDN': 'uuid:f9aab089-f85a-55cf-b7c1-222a72faebe9',
        #   'ports': {
        #       'jsonServer': '19444',
        #       'sslServer': '8092',
        #       'protoBuffer': '19445',
        #       'flatBuffer': '19400'
        #   },
        #   'presentationURL': 'index.html',
        #   'iconList': {
        #       'icon': {
        #           'mimetype': 'image/png',
        #           'height': '100',
        #           'width': '100',
        #           'depth': '32',
        #           'url': 'img/hyperion/ssdp_icon.png'
        #       }
        #   },
        #   'ssdp_usn': 'uuid:f9aab089-f85a-55cf-b7c1-222a72faebe9',
        #   'ssdp_ext': '',
        #   'ssdp_server': 'Raspbian GNU/Linux 10 (buster)/10 UPnP/1.0 Hyperion/2.0.0-alpha.8'}

        # SSDP requires user confirmation.
        self._require_confirm = True
        self._data[CONF_HOST] = urlparse(discovery_info[ATTR_SSDP_LOCATION]).hostname
        try:
            self._port_ui = urlparse(discovery_info[ATTR_SSDP_LOCATION]).port
        except ValueError:
            self._port_ui = const.DEFAULT_PORT_UI

        try:
            self._data[CONF_PORT] = int(
                discovery_info.get("ports", {}).get(
                    "jsonServer", const.DEFAULT_PORT_JSON
                )
            )
        except ValueError:
            self._data[CONF_PORT] = const.DEFAULT_PORT_JSON

        hyperion_id = discovery_info.get(ATTR_UPNP_SERIAL)
        if not hyperion_id:
            return self.async_abort(reason="no_id")

        # For discovery mechanisms, we set the unique_id as early as possible to
        # avoid discovery popping up a duplicate on the screen. The unique_id is set
        # authoritatively later in the flow by asking the server to confirm its id
        # (which should theoretically be the same as specified here)
        await self.async_set_unique_id(hyperion_id)
        self._abort_if_unique_id_configured()

        async with self._create_client(raw_connection=True) as hyperion_client:
            if not hyperion_client:
                return self.async_abort(reason="cannot_connect")
            return await self._advance_to_auth_step_if_necessary(hyperion_client)

    # pylint: disable=arguments-differ
    async def async_step_user(
        self,
        user_input: Optional[ConfigType] = None,
    ) -> Dict[str, Any]:
        """Handle a flow initiated by the user."""
        errors = {}
        if user_input:
            self._data.update(user_input)

            async with self._create_client(raw_connection=True) as hyperion_client:
                if hyperion_client:
                    return await self._advance_to_auth_step_if_necessary(
                        hyperion_client
                    )
                errors[CONF_BASE] = "cannot_connect"

        return self.async_show_form(
            step_id="user",
            data_schema=vol.Schema(
                {
                    vol.Required(CONF_HOST): str,
                    vol.Optional(CONF_PORT, default=const.DEFAULT_PORT_JSON): int,
                }
            ),
            errors=errors,
        )

    async def _cancel_request_token_task(self) -> None:
        """Cancel the request token task if it exists."""
        if self._request_token_task is not None:
            if not self._request_token_task.done():
                self._request_token_task.cancel()

            try:
                await self._request_token_task
            except asyncio.CancelledError:
                pass
            self._request_token_task = None

    async def _request_token_task_func(self, auth_id: str) -> None:
        """Send an async_request_token request."""
        auth_resp: Optional[Dict[str, Any]] = None
        async with self._create_client(raw_connection=True) as hyperion_client:
            if hyperion_client:
                # The Hyperion-py client has a default timeout of 3 minutes on this request.
                auth_resp = await hyperion_client.async_request_token(
                    comment=DEFAULT_ORIGIN, id=auth_id
                )
            assert self.hass
            await self.hass.config_entries.flow.async_configure(
                flow_id=self.flow_id, user_input=auth_resp
            )

    def _get_hyperion_url(self) -> str:
        """Return the URL of the Hyperion UI."""
        # If this flow was kicked off by SSDP, this will be the correct frontend URL. If
        # this is a manual flow instantiation, then it will be a best guess (as this
        # flow does not have that information available to it). This is only used for
        # approving new dynamically created tokens, so the complexity of asking the user
        # manually for this information is likely not worth it (when it would only be
        # used to open a URL, that the user already knows the address of).
        return f"http://{self._data[CONF_HOST]}:{self._port_ui}"

    async def _can_login(self) -> Optional[bool]:
        """Verify login details."""
        async with self._create_client(raw_connection=True) as hyperion_client:
            if not hyperion_client:
                return None
            return bool(
                client.LoginResponseOK(
                    await hyperion_client.async_login(token=self._data[CONF_TOKEN])
                )
            )

    async def async_step_auth(
        self,
        user_input: Optional[ConfigType] = None,
    ) -> Dict[str, Any]:
        """Handle the auth step of a flow."""
        errors = {}
        if user_input:
            if user_input.get(CONF_CREATE_TOKEN):
                return await self.async_step_create_token()

            # Using a static token.
            self._data[CONF_TOKEN] = user_input.get(CONF_TOKEN)
            login_ok = await self._can_login()
            if login_ok is None:
                return self.async_abort(reason="cannot_connect")
            if login_ok:
                return await self.async_step_confirm()
            errors[CONF_BASE] = "invalid_access_token"

        return self.async_show_form(
            step_id="auth",
            data_schema=vol.Schema(
                {
                    vol.Required(CONF_CREATE_TOKEN): bool,
                    vol.Optional(CONF_TOKEN): str,
                }
            ),
            errors=errors,
        )

    async def async_step_create_token(
        self, user_input: Optional[ConfigType] = None
    ) -> Dict[str, Any]:
        """Send a request for a new token."""
        if user_input is None:
            self._auth_id = client.generate_random_auth_id()
            return self.async_show_form(
                step_id="create_token",
                description_placeholders={
                    CONF_AUTH_ID: self._auth_id,
                },
            )

        # Cancel the request token task if it's already running, then re-create it.
        await self._cancel_request_token_task()
        # Start a task in the background requesting a new token. The next step will
        # wait on the response (which includes the user needing to visit the Hyperion
        # UI to approve the request for a new token).
        assert self.hass
        assert self._auth_id is not None
        self._request_token_task = self.hass.async_create_task(
            self._request_token_task_func(self._auth_id)
        )
        return self.async_external_step(
            step_id="create_token_external", url=self._get_hyperion_url()
        )

    async def async_step_create_token_external(
        self, auth_resp: Optional[ConfigType] = None
    ) -> Dict[str, Any]:
        """Handle completion of the request for a new token."""
        if auth_resp is not None and client.ResponseOK(auth_resp):
            token = auth_resp.get(const.KEY_INFO, {}).get(const.KEY_TOKEN)
            if token:
                self._data[CONF_TOKEN] = token
                return self.async_external_step_done(
                    next_step_id="create_token_success"
                )
        return self.async_external_step_done(next_step_id="create_token_fail")

    async def async_step_create_token_success(
        self, _: Optional[ConfigType] = None
    ) -> Dict[str, Any]:
        """Create an entry after successful token creation."""
        # Clean-up the request task.
        await self._cancel_request_token_task()

        # Test the token.
        login_ok = await self._can_login()

        if login_ok is None:
            return self.async_abort(reason="cannot_connect")
        if not login_ok:
            return self.async_abort(reason="auth_new_token_not_work_error")
        return await self.async_step_confirm()

    async def async_step_create_token_fail(
        self, _: Optional[ConfigType] = None
    ) -> Dict[str, Any]:
        """Show an error on the auth form."""
        # Clean-up the request task.
        await self._cancel_request_token_task()
        return self.async_abort(reason="auth_new_token_not_granted_error")

    async def async_step_confirm(
        self, user_input: Optional[ConfigType] = None
    ) -> Dict[str, Any]:
        """Get final confirmation before entry creation."""
        if user_input is None and self._require_confirm:
            return self.async_show_form(
                step_id="confirm",
                description_placeholders={
                    CONF_HOST: self._data[CONF_HOST],
                    CONF_PORT: self._data[CONF_PORT],
                    CONF_ID: self.unique_id,
                },
            )

        async with self._create_client() as hyperion_client:
            if not hyperion_client:
                return self.async_abort(reason="cannot_connect")
            hyperion_id = await hyperion_client.async_sysinfo_id()

        if not hyperion_id:
            return self.async_abort(reason="no_id")

        entry = await self.async_set_unique_id(hyperion_id, raise_on_progress=False)

        # pylint: disable=no-member
        if self.context.get(CONF_SOURCE) == SOURCE_REAUTH and entry is not None:
            assert self.hass
            self.hass.config_entries.async_update_entry(entry, data=self._data)
            # Need to manually reload, as the listener won't have been installed because
            # the initial load did not succeed (the reauth flow will not be initiated if
            # the load succeeds)
            await self.hass.config_entries.async_reload(entry.entry_id)
            return self.async_abort(reason="reauth_successful")

        self._abort_if_unique_id_configured()

        # pylint: disable=no-member  # https://github.com/PyCQA/pylint/issues/3167
        return self.async_create_entry(
            title=f"{self._data[CONF_HOST]}:{self._data[CONF_PORT]}", data=self._data
        )

    @staticmethod
    @callback
    def async_get_options_flow(config_entry: ConfigEntry) -> HyperionOptionsFlow:
        """Get the Hyperion Options flow."""
        return HyperionOptionsFlow(config_entry)


class HyperionOptionsFlow(OptionsFlow):
    """Hyperion options flow."""

    def __init__(self, config_entry: ConfigEntry):
        """Initialize a Hyperion options flow."""
        self._config_entry = config_entry

    async def async_step_init(
        self, user_input: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Manage the options."""
        if user_input is not None:
            return self.async_create_entry(title="", data=user_input)

        return self.async_show_form(
            step_id="init",
            data_schema=vol.Schema(
                {
                    vol.Optional(
                        CONF_PRIORITY,
                        default=self._config_entry.options.get(
                            CONF_PRIORITY, DEFAULT_PRIORITY
                        ),
                    ): vol.All(vol.Coerce(int), vol.Range(min=0, max=255)),
                }
            ),
        )