"""Test the runner."""
import asyncio
import threading
from unittest.mock import patch

import py
import pytest

from homeassistant import core, runner
from homeassistant.core import HomeAssistant
from homeassistant.util import executor, thread

# https://github.com/home-assistant/supervisor/blob/main/supervisor/docker/homeassistant.py
SUPERVISOR_HARD_TIMEOUT = 220

TIMEOUT_SAFETY_MARGIN = 10


async def test_cumulative_shutdown_timeout_less_than_supervisor() -> None:
    """Verify the cumulative shutdown timeout is at least 10s less than the supervisor."""
    assert (
        core.STAGE_1_SHUTDOWN_TIMEOUT
        + core.STAGE_2_SHUTDOWN_TIMEOUT
        + core.STAGE_3_SHUTDOWN_TIMEOUT
        + executor.EXECUTOR_SHUTDOWN_TIMEOUT
        + thread.THREADING_SHUTDOWN_TIMEOUT
        + TIMEOUT_SAFETY_MARGIN
        <= SUPERVISOR_HARD_TIMEOUT
    )


async def test_setup_and_run_hass(hass: HomeAssistant, tmpdir: py.path.local) -> None:
    """Test we can setup and run."""
    test_dir = tmpdir.mkdir("config")
    default_config = runner.RuntimeConfig(test_dir)

    with patch("homeassistant.bootstrap.async_setup_hass", return_value=hass), patch(
        "threading._shutdown"
    ), patch("homeassistant.core.HomeAssistant.async_run") as mock_run:
        await runner.setup_and_run_hass(default_config)
        assert threading._shutdown == thread.deadlock_safe_shutdown

    assert mock_run.called


def test_run(hass: HomeAssistant, tmpdir: py.path.local) -> None:
    """Test we can run."""
    test_dir = tmpdir.mkdir("config")
    default_config = runner.RuntimeConfig(test_dir)

    with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), patch(
        "homeassistant.bootstrap.async_setup_hass", return_value=hass
    ), patch("threading._shutdown"), patch(
        "homeassistant.core.HomeAssistant.async_run"
    ) as mock_run:
        runner.run(default_config)

    assert mock_run.called


def test_run_executor_shutdown_throws(
    hass: HomeAssistant, tmpdir: py.path.local
) -> None:
    """Test we can run and we still shutdown if the executor shutdown throws."""
    test_dir = tmpdir.mkdir("config")
    default_config = runner.RuntimeConfig(test_dir)

    with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), pytest.raises(
        RuntimeError
    ), patch("homeassistant.bootstrap.async_setup_hass", return_value=hass), patch(
        "threading._shutdown"
    ), patch(
        "homeassistant.runner.InterruptibleThreadPoolExecutor.shutdown",
        side_effect=RuntimeError,
    ) as mock_shutdown, patch(
        "homeassistant.core.HomeAssistant.async_run"
    ) as mock_run:
        runner.run(default_config)

    assert mock_shutdown.called
    assert mock_run.called


def test_run_does_not_block_forever_with_shielded_task(
    hass: HomeAssistant, tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
) -> None:
    """Test we can shutdown and not block forever."""
    test_dir = tmpdir.mkdir("config")
    default_config = runner.RuntimeConfig(test_dir)
    tasks = []

    async def _async_create_tasks(*_):
        async def async_raise(*_):
            try:
                await asyncio.sleep(2)
            except asyncio.CancelledError:
                raise Exception

        async def async_shielded(*_):
            try:
                await asyncio.sleep(2)
            except asyncio.CancelledError:
                await asyncio.sleep(2)

        tasks.append(asyncio.ensure_future(asyncio.shield(async_shielded())))
        tasks.append(asyncio.ensure_future(asyncio.sleep(2)))
        tasks.append(asyncio.ensure_future(async_raise()))
        await asyncio.sleep(0.1)
        return 0

    with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), patch(
        "homeassistant.bootstrap.async_setup_hass", return_value=hass
    ), patch("threading._shutdown"), patch(
        "homeassistant.core.HomeAssistant.async_run", _async_create_tasks
    ):
        runner.run(default_config)

    assert len(tasks) == 3
    assert (
        "Task could not be canceled and was still running after shutdown" in caplog.text
    )


async def test_unhandled_exception_traceback(
    hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
    """Test an unhandled exception gets a traceback in debug mode."""

    raised = asyncio.Event()

    async def _unhandled_exception():
        raised.set()
        raise Exception("This is unhandled")

    try:
        hass.loop.set_debug(True)
        task = asyncio.create_task(_unhandled_exception())
        await raised.wait()
        # Delete it without checking result to trigger unhandled exception
        del task
    finally:
        hass.loop.set_debug(False)

    assert "Task exception was never retrieved" in caplog.text
    assert "This is unhandled" in caplog.text
    assert "_unhandled_exception" in caplog.text


def test__enable_posix_spawn() -> None:
    """Test that we can enable posix_spawn on Alpine."""

    def _mock_alpine_exists(path):
        return path == "/etc/alpine-release"

    with patch.object(runner.subprocess, "_USE_POSIX_SPAWN", False), patch.object(
        runner.os.path, "exists", _mock_alpine_exists
    ):
        runner._enable_posix_spawn()
        assert runner.subprocess._USE_POSIX_SPAWN is True

    with patch.object(runner.subprocess, "_USE_POSIX_SPAWN", False), patch.object(
        runner.os.path, "exists", return_value=False
    ):
        runner._enable_posix_spawn()
        assert runner.subprocess._USE_POSIX_SPAWN is False