"""The tests for the device tracker component."""
from datetime import datetime, timedelta
import json
import logging
import os

import pytest

from homeassistant.components import zone
import homeassistant.components.device_tracker as device_tracker
from homeassistant.components.device_tracker import const, legacy
from homeassistant.const import (
    ATTR_ENTITY_PICTURE,
    ATTR_FRIENDLY_NAME,
    ATTR_GPS_ACCURACY,
    ATTR_ICON,
    ATTR_LATITUDE,
    ATTR_LONGITUDE,
    CONF_PLATFORM,
    STATE_HOME,
    STATE_NOT_HOME,
)
from homeassistant.core import State, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import discovery
from homeassistant.helpers.json import JSONEncoder
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util

from tests.async_mock import Mock, call, patch
from tests.common import (
    assert_setup_component,
    async_fire_time_changed,
    mock_registry,
    mock_restore_cache,
    patch_yaml_files,
)
from tests.components.device_tracker import common

TEST_PLATFORM = {device_tracker.DOMAIN: {CONF_PLATFORM: "test"}}

_LOGGER = logging.getLogger(__name__)


@pytest.fixture(name="yaml_devices")
def mock_yaml_devices(hass):
    """Get a path for storing yaml devices."""
    yaml_devices = hass.config.path(legacy.YAML_DEVICES)
    if os.path.isfile(yaml_devices):
        os.remove(yaml_devices)
    yield yaml_devices
    if os.path.isfile(yaml_devices):
        os.remove(yaml_devices)


async def test_is_on(hass):
    """Test is_on method."""
    entity_id = f"{const.DOMAIN}.test"

    hass.states.async_set(entity_id, STATE_HOME)

    assert device_tracker.is_on(hass, entity_id)

    hass.states.async_set(entity_id, STATE_NOT_HOME)

    assert not device_tracker.is_on(hass, entity_id)


async def test_reading_broken_yaml_config(hass):
    """Test when known devices contains invalid data."""
    files = {
        "empty.yaml": "",
        "nodict.yaml": "100",
        "badkey.yaml": "@:\n  name: Device",
        "noname.yaml": "my_device:\n",
        "allok.yaml": "My Device:\n  name: Device",
        "oneok.yaml": ("My Device!:\n  name: Device\nbad_device:\n  nme: Device"),
    }
    args = {"hass": hass, "consider_home": timedelta(seconds=60)}
    with patch_yaml_files(files):
        assert await legacy.async_load_config("empty.yaml", **args) == []
        assert await legacy.async_load_config("nodict.yaml", **args) == []
        assert await legacy.async_load_config("noname.yaml", **args) == []
        assert await legacy.async_load_config("badkey.yaml", **args) == []

        res = await legacy.async_load_config("allok.yaml", **args)
        assert len(res) == 1
        assert res[0].name == "Device"
        assert res[0].dev_id == "my_device"

        res = await legacy.async_load_config("oneok.yaml", **args)
        assert len(res) == 1
        assert res[0].name == "Device"
        assert res[0].dev_id == "my_device"


async def test_reading_yaml_config(hass, yaml_devices):
    """Test the rendering of the YAML configuration."""
    dev_id = "test"
    device = legacy.Device(
        hass,
        timedelta(seconds=180),
        True,
        dev_id,
        "AB:CD:EF:GH:IJ",
        "Test name",
        picture="http://test.picture",
        icon="mdi:kettle",
    )
    await hass.async_add_executor_job(
        legacy.update_config, yaml_devices, dev_id, device
    )
    assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
    config = (await legacy.async_load_config(yaml_devices, hass, device.consider_home))[
        0
    ]
    assert device.dev_id == config.dev_id
    assert device.track == config.track
    assert device.mac == config.mac
    assert device.config_picture == config.config_picture
    assert device.consider_home == config.consider_home
    assert device.icon == config.icon


@patch("homeassistant.components.device_tracker.const.LOGGER.warning")
async def test_duplicate_mac_dev_id(mock_warning, hass):
    """Test adding duplicate MACs or device IDs to DeviceTracker."""
    devices = [
        legacy.Device(
            hass, True, True, "my_device", "AB:01", "My device", None, None, False
        ),
        legacy.Device(
            hass, True, True, "your_device", "AB:01", "Your device", None, None, False
        ),
    ]
    legacy.DeviceTracker(hass, False, True, {}, devices)
    _LOGGER.debug(mock_warning.call_args_list)
    assert (
        mock_warning.call_count == 1
    ), "The only warning call should be duplicates (check DEBUG)"
    args, _ = mock_warning.call_args
    assert "Duplicate device MAC" in args[0], "Duplicate MAC warning expected"

    mock_warning.reset_mock()
    devices = [
        legacy.Device(
            hass, True, True, "my_device", "AB:01", "My device", None, None, False
        ),
        legacy.Device(
            hass, True, True, "my_device", None, "Your device", None, None, False
        ),
    ]
    legacy.DeviceTracker(hass, False, True, {}, devices)

    _LOGGER.debug(mock_warning.call_args_list)
    assert (
        mock_warning.call_count == 1
    ), "The only warning call should be duplicates (check DEBUG)"
    args, _ = mock_warning.call_args
    assert "Duplicate device IDs" in args[0], "Duplicate device IDs warning expected"


async def test_setup_without_yaml_file(hass):
    """Test with no YAML file."""
    with assert_setup_component(1, device_tracker.DOMAIN):
        assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)


async def test_gravatar(hass):
    """Test the Gravatar generation."""
    dev_id = "test"
    device = legacy.Device(
        hass,
        timedelta(seconds=180),
        True,
        dev_id,
        "AB:CD:EF:GH:IJ",
        "Test name",
        gravatar="test@example.com",
    )
    gravatar_url = (
        "https://www.gravatar.com/avatar/"
        "55502f40dc8b7c769880b10874abc9d0.jpg?s=80&d=wavatar"
    )
    assert device.config_picture == gravatar_url


async def test_gravatar_and_picture(hass):
    """Test that Gravatar overrides picture."""
    dev_id = "test"
    device = legacy.Device(
        hass,
        timedelta(seconds=180),
        True,
        dev_id,
        "AB:CD:EF:GH:IJ",
        "Test name",
        picture="http://test.picture",
        gravatar="test@example.com",
    )
    gravatar_url = (
        "https://www.gravatar.com/avatar/"
        "55502f40dc8b7c769880b10874abc9d0.jpg?s=80&d=wavatar"
    )
    assert device.config_picture == gravatar_url


@patch("homeassistant.components.device_tracker.legacy.DeviceTracker.see")
@patch("homeassistant.components.demo.device_tracker.setup_scanner", autospec=True)
async def test_discover_platform(mock_demo_setup_scanner, mock_see, hass):
    """Test discovery of device_tracker demo platform."""
    await discovery.async_load_platform(
        hass, device_tracker.DOMAIN, "demo", {"test_key": "test_val"}, {"bla": {}}
    )
    await hass.async_block_till_done()
    assert device_tracker.DOMAIN in hass.config.components
    assert mock_demo_setup_scanner.called
    assert mock_demo_setup_scanner.call_args[0] == (
        hass,
        {},
        mock_see,
        {"test_key": "test_val"},
    )


async def test_update_stale(hass, mock_device_tracker_conf):
    """Test stalled update."""

    scanner = getattr(hass.components, "test.device_tracker").SCANNER
    scanner.reset()
    scanner.come_home("DEV1")

    now = dt_util.utcnow()
    register_time = datetime(now.year + 1, 9, 15, 23, tzinfo=dt_util.UTC)
    scan_time = datetime(now.year + 1, 9, 15, 23, 1, tzinfo=dt_util.UTC)

    with patch(
        "homeassistant.components.device_tracker.legacy.dt_util.utcnow",
        return_value=register_time,
    ):
        with assert_setup_component(1, device_tracker.DOMAIN):
            assert await async_setup_component(
                hass,
                device_tracker.DOMAIN,
                {
                    device_tracker.DOMAIN: {
                        CONF_PLATFORM: "test",
                        device_tracker.CONF_CONSIDER_HOME: 59,
                    }
                },
            )
            await hass.async_block_till_done()

    assert STATE_HOME == hass.states.get("device_tracker.dev1").state

    scanner.leave_home("DEV1")

    with patch(
        "homeassistant.components.device_tracker.legacy.dt_util.utcnow",
        return_value=scan_time,
    ):
        async_fire_time_changed(hass, scan_time)
        await hass.async_block_till_done()

    assert STATE_NOT_HOME == hass.states.get("device_tracker.dev1").state


async def test_entity_attributes(hass, mock_device_tracker_conf):
    """Test the entity attributes."""
    devices = mock_device_tracker_conf
    dev_id = "test_entity"
    entity_id = f"{const.DOMAIN}.{dev_id}"
    friendly_name = "Paulus"
    picture = "http://placehold.it/200x200"
    icon = "mdi:kettle"

    device = legacy.Device(
        hass,
        timedelta(seconds=180),
        True,
        dev_id,
        None,
        friendly_name,
        picture,
        icon=icon,
    )
    devices.append(device)

    with assert_setup_component(1, device_tracker.DOMAIN):
        assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)

    attrs = hass.states.get(entity_id).attributes

    assert friendly_name == attrs.get(ATTR_FRIENDLY_NAME)
    assert icon == attrs.get(ATTR_ICON)
    assert picture == attrs.get(ATTR_ENTITY_PICTURE)


@patch("homeassistant.components.device_tracker.legacy." "DeviceTracker.async_see")
async def test_see_service(mock_see, hass):
    """Test the see service with a unicode dev_id and NO MAC."""
    with assert_setup_component(1, device_tracker.DOMAIN):
        assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
    params = {
        "dev_id": "some_device",
        "host_name": "example.com",
        "location_name": "Work",
        "gps": [0.3, 0.8],
        "attributes": {"test": "test"},
    }
    common.async_see(hass, **params)
    await hass.async_block_till_done()
    assert mock_see.call_count == 1
    assert mock_see.call_count == 1
    assert mock_see.call_args == call(**params)

    mock_see.reset_mock()
    params["dev_id"] += chr(233)  # e' acute accent from icloud

    common.async_see(hass, **params)
    await hass.async_block_till_done()
    assert mock_see.call_count == 1
    assert mock_see.call_count == 1
    assert mock_see.call_args == call(**params)


async def test_see_service_guard_config_entry(hass, mock_device_tracker_conf):
    """Test the guard if the device is registered in the entity registry."""
    mock_entry = Mock()
    dev_id = "test"
    entity_id = f"{const.DOMAIN}.{dev_id}"
    mock_registry(hass, {entity_id: mock_entry})
    devices = mock_device_tracker_conf
    assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
    params = {"dev_id": dev_id, "gps": [0.3, 0.8]}

    common.async_see(hass, **params)
    await hass.async_block_till_done()

    assert not devices


async def test_new_device_event_fired(hass, mock_device_tracker_conf):
    """Test that the device tracker will fire an event."""
    with assert_setup_component(1, device_tracker.DOMAIN):
        assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)
    test_events = []

    @callback
    def listener(event):
        """Record that our event got called."""
        test_events.append(event)

    hass.bus.async_listen("device_tracker_new_device", listener)

    common.async_see(hass, "mac_1", host_name="hello")
    common.async_see(hass, "mac_1", host_name="hello")

    await hass.async_block_till_done()

    assert len(test_events) == 1

    # Assert we can serialize the event
    json.dumps(test_events[0].as_dict(), cls=JSONEncoder)

    assert test_events[0].data == {
        "entity_id": "device_tracker.hello",
        "host_name": "hello",
        "mac": "MAC_1",
    }


async def test_duplicate_yaml_keys(hass, mock_device_tracker_conf):
    """Test that the device tracker will not generate invalid YAML."""
    devices = mock_device_tracker_conf
    with assert_setup_component(1, device_tracker.DOMAIN):
        assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)

    common.async_see(hass, "mac_1", host_name="hello")
    common.async_see(hass, "mac_2", host_name="hello")

    await hass.async_block_till_done()

    assert len(devices) == 2
    assert devices[0].dev_id != devices[1].dev_id


async def test_invalid_dev_id(hass, mock_device_tracker_conf):
    """Test that the device tracker will not allow invalid dev ids."""
    devices = mock_device_tracker_conf
    with assert_setup_component(1, device_tracker.DOMAIN):
        assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)

    common.async_see(hass, dev_id="hello-world")
    await hass.async_block_till_done()

    assert not devices


async def test_see_state(hass, yaml_devices):
    """Test device tracker see records state correctly."""
    assert await async_setup_component(hass, device_tracker.DOMAIN, TEST_PLATFORM)

    params = {
        "mac": "AA:BB:CC:DD:EE:FF",
        "dev_id": "some_device",
        "host_name": "example.com",
        "location_name": "Work",
        "gps": [0.3, 0.8],
        "gps_accuracy": 1,
        "battery": 100,
        "attributes": {"test": "test", "number": 1},
    }

    common.async_see(hass, **params)
    await hass.async_block_till_done()

    config = await legacy.async_load_config(yaml_devices, hass, timedelta(seconds=0))
    assert len(config) == 1

    state = hass.states.get("device_tracker.example_com")
    attrs = state.attributes
    assert state.state == "Work"
    assert state.object_id == "example_com"
    assert state.name == "example.com"
    assert attrs["friendly_name"] == "example.com"
    assert attrs["battery"] == 100
    assert attrs["latitude"] == 0.3
    assert attrs["longitude"] == 0.8
    assert attrs["test"] == "test"
    assert attrs["gps_accuracy"] == 1
    assert attrs["source_type"] == "gps"
    assert attrs["number"] == 1


async def test_see_passive_zone_state(hass, mock_device_tracker_conf):
    """Test that the device tracker sets gps for passive trackers."""
    now = dt_util.utcnow()

    register_time = datetime(now.year + 1, 9, 15, 23, tzinfo=dt_util.UTC)
    scan_time = datetime(now.year + 1, 9, 15, 23, 1, tzinfo=dt_util.UTC)

    with assert_setup_component(1, zone.DOMAIN):
        zone_info = {
            "name": "Home",
            "latitude": 1,
            "longitude": 2,
            "radius": 250,
            "passive": False,
        }

        await async_setup_component(hass, zone.DOMAIN, {"zone": zone_info})

    scanner = getattr(hass.components, "test.device_tracker").SCANNER
    scanner.reset()
    scanner.come_home("dev1")

    with patch(
        "homeassistant.components.device_tracker.legacy.dt_util.utcnow",
        return_value=register_time,
    ):
        with assert_setup_component(1, device_tracker.DOMAIN):
            assert await async_setup_component(
                hass,
                device_tracker.DOMAIN,
                {
                    device_tracker.DOMAIN: {
                        CONF_PLATFORM: "test",
                        device_tracker.CONF_CONSIDER_HOME: 59,
                    }
                },
            )
            await hass.async_block_till_done()

    state = hass.states.get("device_tracker.dev1")
    attrs = state.attributes
    assert STATE_HOME == state.state
    assert state.object_id == "dev1"
    assert state.name == "dev1"
    assert attrs.get("friendly_name") == "dev1"
    assert attrs.get("latitude") == 1
    assert attrs.get("longitude") == 2
    assert attrs.get("gps_accuracy") == 0
    assert attrs.get("source_type") == device_tracker.SOURCE_TYPE_ROUTER

    scanner.leave_home("dev1")

    with patch(
        "homeassistant.components.device_tracker.legacy.dt_util.utcnow",
        return_value=scan_time,
    ):
        async_fire_time_changed(hass, scan_time)
        await hass.async_block_till_done()

    state = hass.states.get("device_tracker.dev1")
    attrs = state.attributes
    assert STATE_NOT_HOME == state.state
    assert state.object_id == "dev1"
    assert state.name == "dev1"
    assert attrs.get("friendly_name") == "dev1"
    assert attrs.get("latitude") is None
    assert attrs.get("longitude") is None
    assert attrs.get("gps_accuracy") is None
    assert attrs.get("source_type") == device_tracker.SOURCE_TYPE_ROUTER


@patch("homeassistant.components.device_tracker.const.LOGGER.warning")
async def test_see_failures(mock_warning, hass, mock_device_tracker_conf):
    """Test that the device tracker see failures."""
    devices = mock_device_tracker_conf
    tracker = legacy.DeviceTracker(hass, timedelta(seconds=60), 0, {}, [])

    # MAC is not a string (but added)
    await tracker.async_see(mac=567, host_name="Number MAC")

    # No device id or MAC(not added)
    with pytest.raises(HomeAssistantError):
        await tracker.async_see()
    assert mock_warning.call_count == 0

    # Ignore gps on invalid GPS (both added & warnings)
    await tracker.async_see(mac="mac_1_bad_gps", gps=1)
    await tracker.async_see(mac="mac_2_bad_gps", gps=[1])
    await tracker.async_see(mac="mac_3_bad_gps", gps="gps")
    await hass.async_block_till_done()

    assert mock_warning.call_count == 3
    assert len(devices) == 4


async def test_async_added_to_hass(hass):
    """Test restoring state."""
    attr = {
        ATTR_LONGITUDE: 18,
        ATTR_LATITUDE: -33,
        const.ATTR_SOURCE_TYPE: "gps",
        ATTR_GPS_ACCURACY: 2,
        const.ATTR_BATTERY: 100,
    }
    mock_restore_cache(hass, [State("device_tracker.jk", "home", attr)])

    path = hass.config.path(legacy.YAML_DEVICES)

    files = {path: "jk:\n  name: JK Phone\n  track: True"}
    with patch_yaml_files(files):
        assert await async_setup_component(hass, device_tracker.DOMAIN, {})

    state = hass.states.get("device_tracker.jk")
    assert state
    assert state.state == "home"

    for key, val in attr.items():
        atr = state.attributes.get(key)
        assert atr == val, f"{key}={atr} expected: {val}"


async def test_bad_platform(hass):
    """Test bad platform."""
    config = {"device_tracker": [{"platform": "bad_platform"}]}
    with assert_setup_component(0, device_tracker.DOMAIN):
        assert await async_setup_component(hass, device_tracker.DOMAIN, config)


async def test_adding_unknown_device_to_config(mock_device_tracker_conf, hass):
    """Test the adding of unknown devices to configuration file."""
    scanner = getattr(hass.components, "test.device_tracker").SCANNER
    scanner.reset()
    scanner.come_home("DEV1")

    await async_setup_component(
        hass, device_tracker.DOMAIN, {device_tracker.DOMAIN: {CONF_PLATFORM: "test"}}
    )

    await hass.async_block_till_done()

    assert len(mock_device_tracker_conf) == 1
    device = mock_device_tracker_conf[0]
    assert device.dev_id == "dev1"
    assert device.track


async def test_picture_and_icon_on_see_discovery(mock_device_tracker_conf, hass):
    """Test that picture and icon are set in initial see."""
    tracker = legacy.DeviceTracker(hass, timedelta(seconds=60), False, {}, [])
    await tracker.async_see(dev_id=11, picture="pic_url", icon="mdi:icon")
    await hass.async_block_till_done()
    assert len(mock_device_tracker_conf) == 1
    assert mock_device_tracker_conf[0].icon == "mdi:icon"
    assert mock_device_tracker_conf[0].entity_picture == "pic_url"


async def test_backward_compatibility_for_track_new(mock_device_tracker_conf, hass):
    """Test backward compatibility for track new."""
    tracker = legacy.DeviceTracker(
        hass, timedelta(seconds=60), False, {device_tracker.CONF_TRACK_NEW: True}, []
    )
    await tracker.async_see(dev_id=13)
    await hass.async_block_till_done()
    assert len(mock_device_tracker_conf) == 1
    assert mock_device_tracker_conf[0].track is False


async def test_old_style_track_new_is_skipped(mock_device_tracker_conf, hass):
    """Test old style config is skipped."""
    tracker = legacy.DeviceTracker(
        hass, timedelta(seconds=60), None, {device_tracker.CONF_TRACK_NEW: False}, []
    )
    await tracker.async_see(dev_id=14)
    await hass.async_block_till_done()
    assert len(mock_device_tracker_conf) == 1
    assert mock_device_tracker_conf[0].track is False


def test_see_schema_allowing_ios_calls():
    """Test SEE service schema allows extra keys.

    Temp work around because the iOS app sends incorrect data.
    """
    device_tracker.SERVICE_SEE_PAYLOAD_SCHEMA(
        {
            "dev_id": "Test",
            "battery": 35,
            "battery_status": "Not Charging",
            "gps": [10.0, 10.0],
            "gps_accuracy": 300,
            "hostname": "beer",
        }
    )