"""The tests for recording streams."""
import asyncio
from datetime import timedelta
from io import BytesIO
import os
from unittest.mock import patch

import av
import pytest

from homeassistant.components.stream import Stream, create_stream
from homeassistant.components.stream.const import (
from homeassistant.components.stream.core import Part
from homeassistant.components.stream.fmp4utils import find_box
from homeassistant.exceptions import HomeAssistantError
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util

from .common import (
    DefaultSegment as Segment,

from tests.common import async_fire_time_changed

async def stream_component(hass):
    """Set up the component before each test."""
    await async_setup_component(hass, "stream", {"stream": {}})

def filename(tmpdir):
    """Use this filename for the tests."""
    return f"{tmpdir}/test.mp4"

async def test_record_stream(hass, filename, h264_video):
    """Test record stream."""

    worker_finished = asyncio.Event()

    class MockStream(Stream):
        """Mock Stream so we can patch remove_provider."""

        async def remove_provider(self, provider):
            """Add a finished event to Stream.remove_provider."""
            await Stream.remove_provider(self, provider)

    with patch("homeassistant.components.stream.Stream", wraps=MockStream):
        stream = create_stream(hass, h264_video, {})

    with patch.object(hass.config, "is_allowed_path", return_value=True):
        make_recording = hass.async_create_task(stream.async_record(filename))

        # In general usage the recorder will only include what has already been
        # processed by the worker. To guarantee we have some output for the test,
        # wait until the worker has finished before firing
        await worker_finished.wait()

        # Fire the IdleTimer
        future = dt_util.utcnow() + timedelta(seconds=30)
        async_fire_time_changed(hass, future)

        await make_recording

    # Assert
    assert os.path.exists(filename)

async def test_record_lookback(hass, filename, h264_video):
    """Exercise record with lookback."""

    stream = create_stream(hass, h264_video, {})

    # Start an HLS feed to enable lookback
    await stream.start()

    with patch.object(hass.config, "is_allowed_path", return_value=True):
        await stream.async_record(filename, lookback=4)

    # This test does not need recorder cleanup since it is not fully exercised

    await stream.stop()

async def test_record_path_not_allowed(hass, h264_video):
    """Test where the output path is not allowed by home assistant configuration."""

    stream = create_stream(hass, h264_video, {})
    with patch.object(
        hass.config, "is_allowed_path", return_value=False
    ), pytest.raises(HomeAssistantError):
        await stream.async_record("/example/path")

def add_parts_to_segment(segment, source):
    """Add relevant part data to segment for testing recorder."""
    moof_locs = list(find_box(source.getbuffer(), b"moof")) + [len(source.getbuffer())]
    segment.init = source.getbuffer()[: moof_locs[0]].tobytes()
    segment.parts = [
            data=source.getbuffer()[moof_locs[i] : moof_locs[i + 1]],
        for i in range(len(moof_locs) - 1)

async def test_recorder_discontinuity(hass, filename, h264_video):
    """Test recorder save across a discontinuity."""

    # Run
    segment_1 = Segment(sequence=1, stream_id=0)
    add_parts_to_segment(segment_1, h264_video)
    segment_1.duration = 4
    segment_2 = Segment(sequence=2, stream_id=1)
    add_parts_to_segment(segment_2, h264_video)
    segment_2.duration = 4

    provider_ready = asyncio.Event()

    class MockStream(Stream):
        """Mock Stream so we can patch add_provider."""

        async def start(self):
            """Make Stream.start a noop that gives up async context."""
            await asyncio.sleep(0)

        def add_provider(self, fmt, timeout=OUTPUT_IDLE_TIMEOUT):
            """Add a finished event to Stream.add_provider."""
            provider = Stream.add_provider(self, fmt, timeout)
            return provider

    with patch.object(hass.config, "is_allowed_path", return_value=True), patch(
        "homeassistant.components.stream.Stream", wraps=MockStream
    ), patch("homeassistant.components.stream.recorder.RecorderOutput.recv"):
        stream = create_stream(hass, "blank", {})
        make_recording = hass.async_create_task(stream.async_record(filename))
        await provider_ready.wait()

        recorder_output = stream.outputs()[RECORDER_PROVIDER]
        recorder_output._segments.extend([segment_1, segment_2])

        # Fire the IdleTimer
        future = dt_util.utcnow() + timedelta(seconds=30)
        async_fire_time_changed(hass, future)

        await make_recording
    # Assert
    assert os.path.exists(filename)

async def test_recorder_no_segments(hass, filename):
    """Test recorder behavior with a stream failure which causes no segments."""

    stream = create_stream(hass, BytesIO(), {})

    # Run
    with patch.object(hass.config, "is_allowed_path", return_value=True):
        await stream.async_record(filename)

    # Assert
    assert not os.path.exists(filename)

def h264_mov_video():
    """Generate a source video with no audio."""
    return generate_h264_video(container_format="mov")

        ("aac", 1),  # aac is a valid mp4 codec
        ("pcm_mulaw", 0),  # G.711 is not a valid mp4 codec
        ("empty", 0),  # audio stream with no packets
        (None, 0),  # no audio stream
async def test_record_stream_audio(
    Test treatment of different audio inputs.

    Record stream output should have an audio channel when input has
    a valid codec and audio packets and no audio channel otherwise.

    # Remux source video with new audio
    source = remux_with_audio(h264_mov_video, "mov", audio_codec)  # mov can store PCM

    worker_finished = asyncio.Event()

    class MockStream(Stream):
        """Mock Stream so we can patch remove_provider."""

        async def remove_provider(self, provider):
            """Add a finished event to Stream.remove_provider."""
            await Stream.remove_provider(self, provider)

    with patch("homeassistant.components.stream.Stream", wraps=MockStream):
        stream = create_stream(hass, source, {})

    with patch.object(hass.config, "is_allowed_path", return_value=True):
        make_recording = hass.async_create_task(stream.async_record(filename))

        # In general usage the recorder will only include what has already been
        # processed by the worker. To guarantee we have some output for the test,
        # wait until the worker has finished before firing
        await worker_finished.wait()

        # Fire the IdleTimer
        future = dt_util.utcnow() + timedelta(seconds=30)
        async_fire_time_changed(hass, future)

        await make_recording

    # Assert
    assert os.path.exists(filename)

    result = av.open(

    assert len(result.streams.audio) == expected_audio_streams
    await stream.stop()
    await hass.async_block_till_done()

async def test_recorder_log(hass, filename, caplog):
    """Test starting a stream to record logs the url without username and password."""
    stream = create_stream(hass, "https://abcd:efgh@foo.bar", {})
    with patch.object(hass.config, "is_allowed_path", return_value=True):
        await stream.async_record(filename)
    assert "https://abcd:efgh@foo.bar" not in caplog.text
    assert "https://****:****@foo.bar" in caplog.text

async def test_record_stream_rotate(hass, filename, h264_video):
    """Test record stream with rotation."""

    worker_finished = asyncio.Event()

    class MockStream(Stream):
        """Mock Stream so we can patch remove_provider."""

        async def remove_provider(self, provider):
            """Add a finished event to Stream.remove_provider."""
            await Stream.remove_provider(self, provider)

    with patch("homeassistant.components.stream.Stream", wraps=MockStream):
        stream = create_stream(hass, h264_video, {})
        stream.orientation = 8

    with patch.object(hass.config, "is_allowed_path", return_value=True):
        make_recording = hass.async_create_task(stream.async_record(filename))

        # In general usage the recorder will only include what has already been
        # processed by the worker. To guarantee we have some output for the test,
        # wait until the worker has finished before firing
        await worker_finished.wait()

        # Fire the IdleTimer
        future = dt_util.utcnow() + timedelta(seconds=30)
        async_fire_time_changed(hass, future)

        await make_recording

    # Assert
    assert os.path.exists(filename)
    with open(filename, "rb") as rotated_mp4:
        assert_mp4_has_transform_matrix(rotated_mp4.read(), stream.orientation)