Ensure homekit camera stream can be restarted after failure (#35384)

* Ensure camera stream can be restarted after failure

* If ffmpeg failed to start, was killed, or the iOS device
closed the stream right away, the stream could never
be started until the HomeKit bridge was restarted.

* watch ffmpeg instead of checking only once

* handle forceful shutdowns gracefully

* Increase coverage
This commit is contained in:
J. Nick Koston 2020-05-11 08:17:10 -05:00 committed by GitHub
parent f4e4ea10e5
commit 31ee54c133
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 200 additions and 27 deletions

View file

@ -98,6 +98,7 @@ TYPE_VALVE = "valve"
SERV_ACCESSORY_INFO = "AccessoryInformation" SERV_ACCESSORY_INFO = "AccessoryInformation"
SERV_AIR_QUALITY_SENSOR = "AirQualitySensor" SERV_AIR_QUALITY_SENSOR = "AirQualitySensor"
SERV_BATTERY_SERVICE = "BatteryService" SERV_BATTERY_SERVICE = "BatteryService"
SERV_CAMERA_RTP_STREAM_MANAGEMENT = "CameraRTPStreamManagement"
SERV_CARBON_DIOXIDE_SENSOR = "CarbonDioxideSensor" SERV_CARBON_DIOXIDE_SENSOR = "CarbonDioxideSensor"
SERV_CARBON_MONOXIDE_SENSOR = "CarbonMonoxideSensor" SERV_CARBON_MONOXIDE_SENSOR = "CarbonMonoxideSensor"
SERV_CONTACT_SENSOR = "ContactSensor" SERV_CONTACT_SENSOR = "ContactSensor"
@ -177,6 +178,7 @@ CHAR_SERIAL_NUMBER = "SerialNumber"
CHAR_SLEEP_DISCOVER_MODE = "SleepDiscoveryMode" CHAR_SLEEP_DISCOVER_MODE = "SleepDiscoveryMode"
CHAR_SMOKE_DETECTED = "SmokeDetected" CHAR_SMOKE_DETECTED = "SmokeDetected"
CHAR_STATUS_LOW_BATTERY = "StatusLowBattery" CHAR_STATUS_LOW_BATTERY = "StatusLowBattery"
CHAR_STREAMING_STRATUS = "StreamingStatus"
CHAR_SWING_MODE = "SwingMode" CHAR_SWING_MODE = "SwingMode"
CHAR_TARGET_DOOR_STATE = "TargetDoorState" CHAR_TARGET_DOOR_STATE = "TargetDoorState"
CHAR_TARGET_HEATING_COOLING = "TargetHeatingCoolingState" CHAR_TARGET_HEATING_COOLING = "TargetHeatingCoolingState"

View file

@ -1,9 +1,11 @@
"""Class to hold all camera accessories.""" """Class to hold all camera accessories."""
import asyncio import asyncio
from datetime import timedelta
import logging import logging
from haffmpeg.core import HAFFmpeg from haffmpeg.core import HAFFmpeg
from pyhap.camera import ( from pyhap.camera import (
STREAMING_STATUS,
VIDEO_CODEC_PARAM_LEVEL_TYPES, VIDEO_CODEC_PARAM_LEVEL_TYPES,
VIDEO_CODEC_PARAM_PROFILE_ID_TYPES, VIDEO_CODEC_PARAM_PROFILE_ID_TYPES,
Camera as PyhapCamera, Camera as PyhapCamera,
@ -13,10 +15,12 @@ from pyhap.const import CATEGORY_CAMERA
from homeassistant.components.camera.const import DOMAIN as DOMAIN_CAMERA from homeassistant.components.camera.const import DOMAIN as DOMAIN_CAMERA
from homeassistant.components.ffmpeg import DATA_FFMPEG from homeassistant.components.ffmpeg import DATA_FFMPEG
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util import get_local_ip from homeassistant.util import get_local_ip
from .accessories import TYPES, HomeAccessory from .accessories import TYPES, HomeAccessory
from .const import ( from .const import (
CHAR_STREAMING_STRATUS,
CONF_AUDIO_CODEC, CONF_AUDIO_CODEC,
CONF_AUDIO_MAP, CONF_AUDIO_MAP,
CONF_AUDIO_PACKET_SIZE, CONF_AUDIO_PACKET_SIZE,
@ -29,9 +33,10 @@ from .const import (
CONF_VIDEO_CODEC, CONF_VIDEO_CODEC,
CONF_VIDEO_MAP, CONF_VIDEO_MAP,
CONF_VIDEO_PACKET_SIZE, CONF_VIDEO_PACKET_SIZE,
SERV_CAMERA_RTP_STREAM_MANAGEMENT,
) )
from .img_util import scale_jpeg_camera_image from .img_util import scale_jpeg_camera_image
from .util import CAMERA_SCHEMA from .util import CAMERA_SCHEMA, pid_is_alive
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -84,6 +89,11 @@ RESOLUTIONS = [
VIDEO_PROFILE_NAMES = ["baseline", "main", "high"] VIDEO_PROFILE_NAMES = ["baseline", "main", "high"]
FFMPEG_WATCH_INTERVAL = timedelta(seconds=5)
FFMPEG_WATCHER = "ffmpeg_watcher"
FFMPEG_PID = "ffmpeg_pid"
SESSION_ID = "session_id"
@TYPES.register("Camera") @TYPES.register("Camera")
class Camera(HomeAccessory, PyhapCamera): class Camera(HomeAccessory, PyhapCamera):
@ -92,6 +102,7 @@ class Camera(HomeAccessory, PyhapCamera):
def __init__(self, hass, driver, name, entity_id, aid, config): def __init__(self, hass, driver, name, entity_id, aid, config):
"""Initialize a Camera accessory object.""" """Initialize a Camera accessory object."""
self._ffmpeg = hass.data[DATA_FFMPEG] self._ffmpeg = hass.data[DATA_FFMPEG]
self._cur_session = None
self._camera = hass.data[DOMAIN_CAMERA] self._camera = hass.data[DOMAIN_CAMERA]
config_w_defaults = CAMERA_SCHEMA(config) config_w_defaults = CAMERA_SCHEMA(config)
@ -159,11 +170,14 @@ class Camera(HomeAccessory, PyhapCamera):
if stream_source: if stream_source:
return stream_source return stream_source
try: try:
return await camera.stream_source() stream_source = await camera.stream_source()
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
_LOGGER.exception( _LOGGER.exception(
"Failed to get stream source - this could be a transient error or your camera might not be compatible with HomeKit yet" "Failed to get stream source - this could be a transient error or your camera might not be compatible with HomeKit yet"
) )
if stream_source:
self.config[CONF_STREAM_SOURCE] = stream_source
return stream_source
async def start_stream(self, session_info, stream_config): async def start_stream(self, session_info, stream_config):
"""Start a new stream with the given configuration.""" """Start a new stream with the given configuration."""
@ -222,7 +236,45 @@ class Camera(HomeAccessory, PyhapCamera):
session_info["id"], session_info["id"],
stream.process.pid, stream.process.pid,
) )
return True
ffmpeg_watcher = async_track_time_interval(
self.hass, self._async_ffmpeg_watch, FFMPEG_WATCH_INTERVAL
)
self._cur_session = {
FFMPEG_WATCHER: ffmpeg_watcher,
FFMPEG_PID: stream.process.pid,
SESSION_ID: session_info["id"],
}
return await self._async_ffmpeg_watch(0)
async def _async_ffmpeg_watch(self, _):
"""Check to make sure ffmpeg is still running and cleanup if not."""
ffmpeg_pid = self._cur_session[FFMPEG_PID]
session_id = self._cur_session[SESSION_ID]
if pid_is_alive(ffmpeg_pid):
return True
_LOGGER.warning("Streaming process ended unexpectedly - PID %d", ffmpeg_pid)
self._async_stop_ffmpeg_watch()
self._async_set_streaming_available(session_id)
return False
@callback
def _async_stop_ffmpeg_watch(self):
"""Cleanup a streaming session after stopping."""
if not self._cur_session:
return
self._cur_session[FFMPEG_WATCHER]()
self._cur_session = None
@callback
def _async_set_streaming_available(self, session_id):
"""Free the session so they can start another."""
self.streaming_status = STREAMING_STATUS["AVAILABLE"]
self.get_service(SERV_CAMERA_RTP_STREAM_MANAGEMENT).get_characteristic(
CHAR_STREAMING_STRATUS
).notify()
async def stop_stream(self, session_info): async def stop_stream(self, session_info):
"""Stop the stream for the given ``session_id``.""" """Stop the stream for the given ``session_id``."""
@ -230,19 +282,23 @@ class Camera(HomeAccessory, PyhapCamera):
stream = session_info.get("stream") stream = session_info.get("stream")
if not stream: if not stream:
_LOGGER.debug("No stream for session ID %s", session_id) _LOGGER.debug("No stream for session ID %s", session_id)
_LOGGER.info("[%s] Stopping stream.", session_id)
try:
await stream.close()
return return
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Failed to gracefully close stream.")
try: self._async_stop_ffmpeg_watch()
await stream.kill()
except Exception: # pylint: disable=broad-except if not pid_is_alive(stream.process.pid):
_LOGGER.exception("Failed to forcefully close stream.") _LOGGER.info("[%s] Stream already stopped.", session_id)
_LOGGER.debug("Stream process stopped forcefully.") return True
for shutdown_method in ["close", "kill"]:
_LOGGER.info("[%s] %s stream.", session_id, shutdown_method)
try:
await getattr(stream, shutdown_method)()
return
except Exception: # pylint: disable=broad-except
_LOGGER.exception(
"[%s] Failed to %s stream.", session_id, shutdown_method
)
async def reconfigure_stream(self, session_info, stream_config): async def reconfigure_stream(self, session_info, stream_config):
"""Reconfigure the stream so that it uses the given ``stream_config``.""" """Reconfigure the stream so that it uses the given ``stream_config``."""

View file

@ -472,3 +472,13 @@ def find_next_available_port(start_port: int):
if port == MAX_PORT: if port == MAX_PORT:
raise raise
continue continue
def pid_is_alive(pid):
"""Check to see if a process is alive."""
try:
os.kill(pid, 0)
return True
except OSError:
pass
return False

View file

@ -14,6 +14,7 @@ from homeassistant.components.homekit.const import (
CONF_SUPPORT_AUDIO, CONF_SUPPORT_AUDIO,
CONF_VIDEO_CODEC, CONF_VIDEO_CODEC,
VIDEO_CODEC_COPY, VIDEO_CODEC_COPY,
VIDEO_CODEC_H264_OMX,
) )
from homeassistant.components.homekit.img_util import TurboJPEGSingleton from homeassistant.components.homekit.img_util import TurboJPEGSingleton
from homeassistant.components.homekit.type_cameras import Camera from homeassistant.components.homekit.type_cameras import Camera
@ -23,12 +24,14 @@ from homeassistant.setup import async_setup_component
from .common import mock_turbo_jpeg from .common import mock_turbo_jpeg
from tests.async_mock import AsyncMock, MagicMock, patch from tests.async_mock import AsyncMock, MagicMock, PropertyMock, patch
MOCK_START_STREAM_TLV = "ARUCAQEBEDMD1QMXzEaatnKSQ2pxovYCNAEBAAIJAQECAgECAwEAAwsBAgAFAgLQAgMBHgQXAQFjAgQ768/RAwIrAQQEAAAAPwUCYgUDLAEBAwIMAQEBAgEAAwECBAEUAxYBAW4CBCzq28sDAhgABAQAAKBABgENBAEA" MOCK_START_STREAM_TLV = "ARUCAQEBEDMD1QMXzEaatnKSQ2pxovYCNAEBAAIJAQECAgECAwEAAwsBAgAFAgLQAgMBHgQXAQFjAgQ768/RAwIrAQQEAAAAPwUCYgUDLAEBAwIMAQEBAgEAAwECBAEUAxYBAW4CBCzq28sDAhgABAQAAKBABgENBAEA"
MOCK_END_POINTS_TLV = "ARAzA9UDF8xGmrZykkNqcaL2AgEAAxoBAQACDTE5Mi4xNjguMjA4LjUDAi7IBAKkxwQlAQEAAhDN0+Y0tZ4jzoO0ske9UsjpAw6D76oVXnoi7DbawIG4CwUlAQEAAhCyGcROB8P7vFRDzNF2xrK1Aw6NdcLugju9yCfkWVSaVAYEDoAsAAcEpxV8AA==" MOCK_END_POINTS_TLV = "ARAzA9UDF8xGmrZykkNqcaL2AgEAAxoBAQACDTE5Mi4xNjguMjA4LjUDAi7IBAKkxwQlAQEAAhDN0+Y0tZ4jzoO0ske9UsjpAw6D76oVXnoi7DbawIG4CwUlAQEAAhCyGcROB8P7vFRDzNF2xrK1Aw6NdcLugju9yCfkWVSaVAYEDoAsAAcEpxV8AA=="
MOCK_START_STREAM_SESSION_UUID = UUID("3303d503-17cc-469a-b672-92436a71a2f6") MOCK_START_STREAM_SESSION_UUID = UUID("3303d503-17cc-469a-b672-92436a71a2f6")
PID_THAT_WILL_NEVER_BE_ALIVE = 2147483647
async def _async_start_streaming(hass, acc): async def _async_start_streaming(hass, acc):
"""Start streaming a camera.""" """Start streaming a camera."""
@ -44,13 +47,27 @@ async def _async_setup_endpoints(hass, acc):
await hass.async_block_till_done() await hass.async_block_till_done()
async def _async_stop_stream(hass, acc): async def _async_reconfigure_stream(hass, acc, session_info, stream_config):
"""Stop a camera stream.""" """Reconfigure the stream."""
await acc.reconfigure_stream(session_info, stream_config)
await acc.run_handler()
await hass.async_block_till_done()
async def _async_stop_all_streams(hass, acc):
"""Stop all camera streams."""
await acc.stop() await acc.stop()
await acc.run_handler() await acc.run_handler()
await hass.async_block_till_done() await hass.async_block_till_done()
async def _async_stop_stream(hass, acc, session_info):
"""Stop a camera stream."""
await acc.stop_stream(session_info)
await acc.run_handler()
await hass.async_block_till_done()
@pytest.fixture() @pytest.fixture()
def run_driver(hass): def run_driver(hass):
"""Return a custom AccessoryDriver instance for HomeKit accessory init.""" """Return a custom AccessoryDriver instance for HomeKit accessory init."""
@ -66,6 +83,16 @@ def run_driver(hass):
) )
def _get_exits_after_startup_mock_ffmpeg():
"""Return a ffmpeg that will have an invalid pid."""
ffmpeg = MagicMock()
type(ffmpeg.process).pid = PropertyMock(return_value=PID_THAT_WILL_NEVER_BE_ALIVE)
ffmpeg.open = AsyncMock(return_value=True)
ffmpeg.close = AsyncMock(return_value=True)
ffmpeg.kill = AsyncMock(return_value=True)
return ffmpeg
def _get_working_mock_ffmpeg(): def _get_working_mock_ffmpeg():
"""Return a working ffmpeg.""" """Return a working ffmpeg."""
ffmpeg = MagicMock() ffmpeg = MagicMock()
@ -78,6 +105,7 @@ def _get_working_mock_ffmpeg():
def _get_failing_mock_ffmpeg(): def _get_failing_mock_ffmpeg():
"""Return an ffmpeg that fails to shutdown.""" """Return an ffmpeg that fails to shutdown."""
ffmpeg = MagicMock() ffmpeg = MagicMock()
type(ffmpeg.process).pid = PropertyMock(return_value=PID_THAT_WILL_NEVER_BE_ALIVE)
ffmpeg.open = AsyncMock(return_value=False) ffmpeg.open = AsyncMock(return_value=False)
ffmpeg.close = AsyncMock(side_effect=OSError) ffmpeg.close = AsyncMock(side_effect=OSError)
ffmpeg.kill = AsyncMock(side_effect=OSError) ffmpeg.kill = AsyncMock(side_effect=OSError)
@ -125,7 +153,7 @@ async def test_camera_stream_source_configured(hass, run_driver, events):
return_value=working_ffmpeg, return_value=working_ffmpeg,
): ):
await _async_start_streaming(hass, acc) await _async_start_streaming(hass, acc)
await _async_stop_stream(hass, acc) await _async_stop_all_streams(hass, acc)
expected_output = ( expected_output = (
"-map 0:v:0 -an -c:v libx264 -profile:v high -tune zerolatency -pix_fmt " "-map 0:v:0 -an -c:v libx264 -profile:v high -tune zerolatency -pix_fmt "
@ -146,6 +174,10 @@ async def test_camera_stream_source_configured(hass, run_driver, events):
stdout_pipe=False, stdout_pipe=False,
) )
await _async_setup_endpoints(hass, acc)
working_ffmpeg = _get_working_mock_ffmpeg()
session_info = acc.sessions[MOCK_START_STREAM_SESSION_UUID]
with patch( with patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source", "homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="rtsp://example.local", return_value="rtsp://example.local",
@ -154,9 +186,9 @@ async def test_camera_stream_source_configured(hass, run_driver, events):
return_value=working_ffmpeg, return_value=working_ffmpeg,
): ):
await _async_start_streaming(hass, acc) await _async_start_streaming(hass, acc)
await _async_stop_stream(hass, acc) await _async_stop_all_streams(hass, acc)
# Calling a second time should not throw # Calling a second time should not throw
await _async_stop_stream(hass, acc) await _async_stop_all_streams(hass, acc)
turbo_jpeg = mock_turbo_jpeg( turbo_jpeg = mock_turbo_jpeg(
first_width=16, first_height=12, second_width=300, second_height=200 first_width=16, first_height=12, second_width=300, second_height=200
@ -225,9 +257,9 @@ async def test_camera_stream_source_configured_with_failing_ffmpeg(
return_value=_get_failing_mock_ffmpeg(), return_value=_get_failing_mock_ffmpeg(),
): ):
await _async_start_streaming(hass, acc) await _async_start_streaming(hass, acc)
await _async_stop_stream(hass, acc) await _async_stop_all_streams(hass, acc)
# Calling a second time should not throw # Calling a second time should not throw
await _async_stop_stream(hass, acc) await _async_stop_all_streams(hass, acc)
async def test_camera_stream_source_found(hass, run_driver, events): async def test_camera_stream_source_found(hass, run_driver, events):
@ -257,7 +289,9 @@ async def test_camera_stream_source_found(hass, run_driver, events):
return_value=_get_working_mock_ffmpeg(), return_value=_get_working_mock_ffmpeg(),
): ):
await _async_start_streaming(hass, acc) await _async_start_streaming(hass, acc)
await _async_stop_stream(hass, acc) await _async_stop_all_streams(hass, acc)
await _async_setup_endpoints(hass, acc)
with patch( with patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source", "homeassistant.components.demo.camera.DemoCamera.stream_source",
@ -267,7 +301,7 @@ async def test_camera_stream_source_found(hass, run_driver, events):
return_value=_get_working_mock_ffmpeg(), return_value=_get_working_mock_ffmpeg(),
): ):
await _async_start_streaming(hass, acc) await _async_start_streaming(hass, acc)
await _async_stop_stream(hass, acc) await _async_stop_all_streams(hass, acc)
async def test_camera_stream_source_fails(hass, run_driver, events): async def test_camera_stream_source_fails(hass, run_driver, events):
@ -297,7 +331,7 @@ async def test_camera_stream_source_fails(hass, run_driver, events):
return_value=_get_working_mock_ffmpeg(), return_value=_get_working_mock_ffmpeg(),
): ):
await _async_start_streaming(hass, acc) await _async_start_streaming(hass, acc)
await _async_stop_stream(hass, acc) await _async_stop_all_streams(hass, acc)
async def test_camera_with_no_stream(hass, run_driver, events): async def test_camera_with_no_stream(hass, run_driver, events):
@ -317,7 +351,7 @@ async def test_camera_with_no_stream(hass, run_driver, events):
await _async_setup_endpoints(hass, acc) await _async_setup_endpoints(hass, acc)
await _async_start_streaming(hass, acc) await _async_start_streaming(hass, acc)
await _async_stop_stream(hass, acc) await _async_stop_all_streams(hass, acc)
with pytest.raises(HomeAssistantError): with pytest.raises(HomeAssistantError):
await hass.async_add_executor_job( await hass.async_add_executor_job(
@ -370,7 +404,9 @@ async def test_camera_stream_source_configured_and_copy_codec(hass, run_driver,
return_value=working_ffmpeg, return_value=working_ffmpeg,
): ):
await _async_start_streaming(hass, acc) await _async_start_streaming(hass, acc)
await _async_stop_stream(hass, acc) await _async_reconfigure_stream(hass, acc, session_info, {})
await _async_stop_stream(hass, acc, session_info)
await _async_stop_all_streams(hass, acc)
expected_output = ( expected_output = (
"-map 0:v:0 -an -c:v copy -tune zerolatency -pix_fmt yuv420p -r 30 -b:v 299k " "-map 0:v:0 -an -c:v copy -tune zerolatency -pix_fmt yuv420p -r 30 -b:v 299k "
@ -389,3 +425,72 @@ async def test_camera_stream_source_configured_and_copy_codec(hass, run_driver,
output=expected_output.format(**session_info), output=expected_output.format(**session_info),
stdout_pipe=False, stdout_pipe=False,
) )
async def test_camera_streaming_fails_after_starting_ffmpeg(hass, run_driver, events):
"""Test a camera that can stream with a configured source."""
await async_setup_component(hass, ffmpeg.DOMAIN, {ffmpeg.DOMAIN: {}})
await async_setup_component(
hass, camera.DOMAIN, {camera.DOMAIN: {"platform": "demo"}}
)
entity_id = "camera.demo_camera"
hass.states.async_set(entity_id, None)
await hass.async_block_till_done()
acc = Camera(
hass,
run_driver,
"Camera",
entity_id,
2,
{
CONF_STREAM_SOURCE: "/dev/null",
CONF_SUPPORT_AUDIO: True,
CONF_VIDEO_CODEC: VIDEO_CODEC_H264_OMX,
CONF_AUDIO_CODEC: AUDIO_CODEC_COPY,
},
)
bridge = HomeBridge("hass", run_driver, "Test Bridge")
bridge.add_accessory(acc)
await acc.run_handler()
assert acc.aid == 2
assert acc.category == 17 # Camera
await _async_setup_endpoints(hass, acc)
session_info = acc.sessions[MOCK_START_STREAM_SESSION_UUID]
ffmpeg_with_invalid_pid = _get_exits_after_startup_mock_ffmpeg()
with patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value=None,
), patch(
"homeassistant.components.homekit.type_cameras.HAFFmpeg",
return_value=ffmpeg_with_invalid_pid,
):
await _async_start_streaming(hass, acc)
await _async_reconfigure_stream(hass, acc, session_info, {})
# Should not throw
await _async_stop_stream(hass, acc, {"id": "does_not_exist"})
await _async_stop_all_streams(hass, acc)
expected_output = (
"-map 0:v:0 -an -c:v h264_omx -profile:v high -tune zerolatency -pix_fmt yuv420p -r 30 -b:v 299k "
"-bufsize 1196k -maxrate 299k -payload_type 99 -ssrc {v_ssrc} -f rtp -srtp_out_suite "
"AES_CM_128_HMAC_SHA1_80 -srtp_out_params zdPmNLWeI86DtLJHvVLI6YPvqhVeeiLsNtrAgbgL "
"srtp://192.168.208.5:51246?rtcpport=51246&localrtcpport=51246&pkt_size=1316 -map 0:a:0 "
"-vn -c:a copy -ac 1 -ar 24k -b:a 24k -bufsize 96k -payload_type 110 -ssrc {a_ssrc} "
"-f rtp -srtp_out_suite AES_CM_128_HMAC_SHA1_80 -srtp_out_params "
"shnETgfD+7xUQ8zRdsaytY11wu6CO73IJ+RZVJpU "
"srtp://192.168.208.5:51108?rtcpport=51108&localrtcpport=51108&pkt_size=188"
)
ffmpeg_with_invalid_pid.open.assert_called_with(
cmd=[],
input_source="-i /dev/null",
output=expected_output.format(**session_info),
stdout_pipe=False,
)