"""Test ZHA analog output."""
from unittest.mock import call, patch

import pytest
from zigpy.exceptions import ZigbeeException
from zigpy.profiles import zha
import zigpy.zcl.clusters.general as general
import zigpy.zcl.clusters.lighting as lighting
import zigpy.zcl.foundation as zcl_f

from homeassistant.components.number import DOMAIN as NUMBER_DOMAIN
from homeassistant.const import STATE_UNAVAILABLE, EntityCategory, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.setup import async_setup_component

from .common import (
    async_enable_traffic,
    async_test_rejoin,
    find_entity_id,
    send_attributes_report,
    update_attribute_cache,
)
from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE

from tests.common import mock_coro


@pytest.fixture(autouse=True)
def number_platform_only():
    """Only set up the number and required base platforms to speed up tests."""
    with patch(
        "homeassistant.components.zha.PLATFORMS",
        (
            Platform.BUTTON,
            Platform.DEVICE_TRACKER,
            Platform.LIGHT,
            Platform.NUMBER,
            Platform.SELECT,
            Platform.SENSOR,
        ),
    ):
        yield


@pytest.fixture
def zigpy_analog_output_device(zigpy_device_mock):
    """Zigpy analog_output device."""

    endpoints = {
        1: {
            SIG_EP_TYPE: zha.DeviceType.LEVEL_CONTROL_SWITCH,
            SIG_EP_INPUT: [general.AnalogOutput.cluster_id, general.Basic.cluster_id],
            SIG_EP_OUTPUT: [],
        }
    }
    return zigpy_device_mock(endpoints)


@pytest.fixture
async def light(zigpy_device_mock):
    """Siren fixture."""

    zigpy_device = zigpy_device_mock(
        {
            1: {
                SIG_EP_PROFILE: zha.PROFILE_ID,
                SIG_EP_TYPE: zha.DeviceType.COLOR_DIMMABLE_LIGHT,
                SIG_EP_INPUT: [
                    general.Basic.cluster_id,
                    general.Identify.cluster_id,
                    general.OnOff.cluster_id,
                    general.LevelControl.cluster_id,
                    lighting.Color.cluster_id,
                ],
                SIG_EP_OUTPUT: [general.Ota.cluster_id],
            }
        },
        node_descriptor=b"\x02@\x84_\x11\x7fd\x00\x00,d\x00\x00",
    )

    return zigpy_device


async def test_number(
    hass: HomeAssistant, zha_device_joined_restored, zigpy_analog_output_device
) -> None:
    """Test ZHA number platform."""

    cluster = zigpy_analog_output_device.endpoints.get(1).analog_output
    cluster.PLUGGED_ATTR_READS = {
        "max_present_value": 100.0,
        "min_present_value": 1.0,
        "relinquish_default": 50.0,
        "resolution": 1.1,
        "description": "PWM1",
        "engineering_units": 98,
        "application_type": 4 * 0x10000,
    }
    update_attribute_cache(cluster)
    cluster.PLUGGED_ATTR_READS["present_value"] = 15.0

    zha_device = await zha_device_joined_restored(zigpy_analog_output_device)
    # one for present_value and one for the rest configuration attributes
    assert cluster.read_attributes.call_count == 3
    attr_reads = set()
    for call_args in cluster.read_attributes.call_args_list:
        attr_reads |= set(call_args[0][0])
    assert "max_present_value" in attr_reads
    assert "min_present_value" in attr_reads
    assert "relinquish_default" in attr_reads
    assert "resolution" in attr_reads
    assert "description" in attr_reads
    assert "engineering_units" in attr_reads
    assert "application_type" in attr_reads

    entity_id = await find_entity_id(Platform.NUMBER, zha_device, hass)
    assert entity_id is not None

    await async_enable_traffic(hass, [zha_device], enabled=False)
    # test that the number was created and that it is unavailable
    assert hass.states.get(entity_id).state == STATE_UNAVAILABLE

    # allow traffic to flow through the gateway and device
    assert cluster.read_attributes.call_count == 3
    await async_enable_traffic(hass, [zha_device])
    await hass.async_block_till_done()
    assert cluster.read_attributes.call_count == 6

    # test that the state has changed from unavailable to 15.0
    assert hass.states.get(entity_id).state == "15.0"

    # test attributes
    assert hass.states.get(entity_id).attributes.get("min") == 1.0
    assert hass.states.get(entity_id).attributes.get("max") == 100.0
    assert hass.states.get(entity_id).attributes.get("step") == 1.1
    assert hass.states.get(entity_id).attributes.get("icon") == "mdi:percent"
    assert hass.states.get(entity_id).attributes.get("unit_of_measurement") == "%"
    assert (
        hass.states.get(entity_id).attributes.get("friendly_name")
        == "FakeManufacturer FakeModel Number PWM1"
    )

    # change value from device
    assert cluster.read_attributes.call_count == 6
    await send_attributes_report(hass, cluster, {0x0055: 15})
    assert hass.states.get(entity_id).state == "15.0"

    # update value from device
    await send_attributes_report(hass, cluster, {0x0055: 20})
    assert hass.states.get(entity_id).state == "20.0"

    # change value from HA
    with patch(
        "zigpy.zcl.Cluster.write_attributes",
        return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]),
    ):
        # set value via UI
        await hass.services.async_call(
            NUMBER_DOMAIN,
            "set_value",
            {"entity_id": entity_id, "value": 30.0},
            blocking=True,
        )
        assert len(cluster.write_attributes.mock_calls) == 1
        assert cluster.write_attributes.call_args == call({"present_value": 30.0})
        cluster.PLUGGED_ATTR_READS["present_value"] = 30.0

    # test rejoin
    assert cluster.read_attributes.call_count == 6
    await async_test_rejoin(hass, zigpy_analog_output_device, [cluster], (1,))
    assert hass.states.get(entity_id).state == "30.0"
    assert cluster.read_attributes.call_count == 9

    # update device value with failed attribute report
    cluster.PLUGGED_ATTR_READS["present_value"] = 40.0
    # validate the entity still contains old value
    assert hass.states.get(entity_id).state == "30.0"

    await async_setup_component(hass, "homeassistant", {})
    await hass.async_block_till_done()

    await hass.services.async_call(
        "homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
    )
    assert hass.states.get(entity_id).state == "40.0"
    assert cluster.read_attributes.call_count == 10
    assert "present_value" in cluster.read_attributes.call_args[0][0]


@pytest.mark.parametrize(
    ("attr", "initial_value", "new_value"),
    (
        ("on_off_transition_time", 20, 5),
        ("on_level", 255, 50),
        ("on_transition_time", 5, 1),
        ("off_transition_time", 5, 1),
        ("default_move_rate", 1, 5),
        ("start_up_current_level", 254, 125),
    ),
)
async def test_level_control_number(
    hass: HomeAssistant, light, zha_device_joined, attr, initial_value, new_value
) -> None:
    """Test ZHA level control number entities - new join."""

    entity_registry = er.async_get(hass)
    level_control_cluster = light.endpoints[1].level
    level_control_cluster.PLUGGED_ATTR_READS = {
        attr: initial_value,
    }
    zha_device = await zha_device_joined(light)

    entity_id = await find_entity_id(
        Platform.NUMBER,
        zha_device,
        hass,
        qualifier=attr,
    )
    assert entity_id is not None

    assert level_control_cluster.read_attributes.call_count == 3
    assert (
        call(
            [
                "on_off_transition_time",
                "on_level",
                "on_transition_time",
                "off_transition_time",
                "default_move_rate",
            ],
            allow_cache=True,
            only_cache=False,
            manufacturer=None,
        )
        in level_control_cluster.read_attributes.call_args_list
    )

    assert (
        call(
            ["start_up_current_level"],
            allow_cache=True,
            only_cache=False,
            manufacturer=None,
        )
        in level_control_cluster.read_attributes.call_args_list
    )

    assert (
        call(
            [
                "current_level",
            ],
            allow_cache=False,
            only_cache=False,
            manufacturer=None,
        )
        in level_control_cluster.read_attributes.call_args_list
    )

    state = hass.states.get(entity_id)
    assert state
    assert state.state == str(initial_value)

    entity_entry = entity_registry.async_get(entity_id)
    assert entity_entry
    assert entity_entry.entity_category == EntityCategory.CONFIG

    # Test number set_value
    await hass.services.async_call(
        "number",
        "set_value",
        {
            "entity_id": entity_id,
            "value": new_value,
        },
        blocking=True,
    )

    assert level_control_cluster.write_attributes.call_count == 1
    assert level_control_cluster.write_attributes.call_args[0][0] == {
        attr: new_value,
    }

    state = hass.states.get(entity_id)
    assert state
    assert state.state == str(new_value)

    level_control_cluster.read_attributes.reset_mock()
    await async_setup_component(hass, "homeassistant", {})
    await hass.async_block_till_done()

    await hass.services.async_call(
        "homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
    )
    # the mocking doesn't update the attr cache so this flips back to initial value
    assert hass.states.get(entity_id).state == str(initial_value)
    assert level_control_cluster.read_attributes.call_count == 1
    assert (
        call(
            [
                attr,
            ],
            allow_cache=False,
            only_cache=False,
            manufacturer=None,
        )
        in level_control_cluster.read_attributes.call_args_list
    )

    level_control_cluster.write_attributes.reset_mock()
    level_control_cluster.write_attributes.side_effect = ZigbeeException

    await hass.services.async_call(
        "number",
        "set_value",
        {
            "entity_id": entity_id,
            "value": new_value,
        },
        blocking=True,
    )

    assert level_control_cluster.write_attributes.call_count == 1
    assert level_control_cluster.write_attributes.call_args[0][0] == {
        attr: new_value,
    }
    assert hass.states.get(entity_id).state == str(initial_value)


@pytest.mark.parametrize(
    ("attr", "initial_value", "new_value"),
    (("start_up_color_temperature", 500, 350),),
)
async def test_color_number(
    hass: HomeAssistant, light, zha_device_joined, attr, initial_value, new_value
) -> None:
    """Test ZHA color number entities - new join."""

    entity_registry = er.async_get(hass)
    color_cluster = light.endpoints[1].light_color
    color_cluster.PLUGGED_ATTR_READS = {
        attr: initial_value,
    }
    zha_device = await zha_device_joined(light)

    entity_id = await find_entity_id(
        Platform.NUMBER,
        zha_device,
        hass,
        qualifier=attr,
    )
    assert entity_id is not None

    assert color_cluster.read_attributes.call_count == 3
    assert (
        call(
            [
                "color_temp_physical_min",
                "color_temp_physical_max",
                "color_capabilities",
                "start_up_color_temperature",
                "options",
            ],
            allow_cache=True,
            only_cache=False,
            manufacturer=None,
        )
        in color_cluster.read_attributes.call_args_list
    )

    state = hass.states.get(entity_id)
    assert state
    assert state.state == str(initial_value)

    entity_entry = entity_registry.async_get(entity_id)
    assert entity_entry
    assert entity_entry.entity_category == EntityCategory.CONFIG

    # Test number set_value
    await hass.services.async_call(
        "number",
        "set_value",
        {
            "entity_id": entity_id,
            "value": new_value,
        },
        blocking=True,
    )

    assert color_cluster.write_attributes.call_count == 1
    assert color_cluster.write_attributes.call_args[0][0] == {
        attr: new_value,
    }

    state = hass.states.get(entity_id)
    assert state
    assert state.state == str(new_value)

    color_cluster.read_attributes.reset_mock()
    await async_setup_component(hass, "homeassistant", {})
    await hass.async_block_till_done()

    await hass.services.async_call(
        "homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
    )
    # the mocking doesn't update the attr cache so this flips back to initial value
    assert hass.states.get(entity_id).state == str(initial_value)
    assert color_cluster.read_attributes.call_count == 1
    assert (
        call(
            [
                attr,
            ],
            allow_cache=False,
            only_cache=False,
            manufacturer=None,
        )
        in color_cluster.read_attributes.call_args_list
    )

    color_cluster.write_attributes.reset_mock()
    color_cluster.write_attributes.side_effect = ZigbeeException

    await hass.services.async_call(
        "number",
        "set_value",
        {
            "entity_id": entity_id,
            "value": new_value,
        },
        blocking=True,
    )

    assert color_cluster.write_attributes.call_count == 1
    assert color_cluster.write_attributes.call_args[0][0] == {
        attr: new_value,
    }
    assert hass.states.get(entity_id).state == str(initial_value)