Refactor ONVIF (#35222)

This commit is contained in:
Jason Hunter 2020-05-06 12:29:59 -04:00 committed by GitHub
parent 0b8f8db67b
commit 19734e7b2c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 659 additions and 522 deletions

View file

@ -509,7 +509,9 @@ omit =
homeassistant/components/onewire/sensor.py homeassistant/components/onewire/sensor.py
homeassistant/components/onkyo/media_player.py homeassistant/components/onkyo/media_player.py
homeassistant/components/onvif/__init__.py homeassistant/components/onvif/__init__.py
homeassistant/components/onvif/base.py
homeassistant/components/onvif/camera.py homeassistant/components/onvif/camera.py
homeassistant/components/onvif/device.py
homeassistant/components/opencv/* homeassistant/components/opencv/*
homeassistant/components/openevse/sensor.py homeassistant/components/openevse/sensor.py
homeassistant/components/openexchangerates/sensor.py homeassistant/components/openexchangerates/sensor.py

View file

@ -13,6 +13,7 @@ from homeassistant.const import (
CONF_USERNAME, CONF_USERNAME,
) )
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_per_platform from homeassistant.helpers import config_per_platform
from .const import ( from .const import (
@ -25,6 +26,7 @@ from .const import (
DOMAIN, DOMAIN,
RTSP_TRANS_PROTOCOLS, RTSP_TRANS_PROTOCOLS,
) )
from .device import ONVIFDevice
CONFIG_SCHEMA = vol.Schema({DOMAIN: vol.Schema({})}, extra=vol.ALLOW_EXTRA) CONFIG_SCHEMA = vol.Schema({DOMAIN: vol.Schema({})}, extra=vol.ALLOW_EXTRA)
@ -61,9 +63,22 @@ async def async_setup(hass: HomeAssistant, config: dict):
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up ONVIF from a config entry.""" """Set up ONVIF from a config entry."""
if DOMAIN not in hass.data:
hass.data[DOMAIN] = {}
if not entry.options: if not entry.options:
await async_populate_options(hass, entry) await async_populate_options(hass, entry)
device = ONVIFDevice(hass, entry)
if not await device.async_setup():
return False
if not device.available:
raise ConfigEntryNotReady()
hass.data[DOMAIN][entry.unique_id] = device
for component in PLATFORMS: for component in PLATFORMS:
hass.async_create_task( hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, component) hass.config_entries.async_forward_entry_setup(entry, component)

View file

@ -0,0 +1,31 @@
"""Base classes for ONVIF entities."""
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.entity import Entity
from .device import ONVIFDevice
from .models import Profile
class ONVIFBaseEntity(Entity):
"""Base class common to all ONVIF entities."""
def __init__(self, device: ONVIFDevice, profile: Profile) -> None:
"""Initialize the ONVIF entity."""
self.device = device
self.profile = profile
@property
def available(self):
"""Return True if device is available."""
return self.device.available
@property
def device_info(self):
"""Return a device description for device registry."""
return {
"connections": {(CONNECTION_NETWORK_MAC, self.device.info.mac)},
"manufacturer": self.device.info.manufacturer,
"model": self.device.info.model,
"name": self.device.name,
"sw_version": self.device.info.fw_version,
}

View file

@ -1,37 +1,18 @@
"""Support for ONVIF Cameras with FFmpeg as decoder.""" """Support for ONVIF Cameras with FFmpeg as decoder."""
import asyncio import asyncio
import datetime as dt
import os
from typing import Optional
from aiohttp.client_exceptions import ClientConnectionError, ServerDisconnectedError
from haffmpeg.camera import CameraMjpeg from haffmpeg.camera import CameraMjpeg
from haffmpeg.tools import IMAGE_JPEG, ImageFrame from haffmpeg.tools import IMAGE_JPEG, ImageFrame
import onvif
from onvif import ONVIFCamera, exceptions
import requests import requests
from requests.auth import HTTPDigestAuth from requests.auth import HTTPDigestAuth
import voluptuous as vol import voluptuous as vol
from zeep.asyncio import AsyncTransport
from zeep.exceptions import Fault
from homeassistant.components.camera import SUPPORT_STREAM, Camera from homeassistant.components.camera import SUPPORT_STREAM, Camera
from homeassistant.components.ffmpeg import CONF_EXTRA_ARGUMENTS, DATA_FFMPEG from homeassistant.components.ffmpeg import CONF_EXTRA_ARGUMENTS, DATA_FFMPEG
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
)
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers import config_validation as cv, entity_platform from homeassistant.helpers import config_validation as cv, entity_platform
from homeassistant.helpers.aiohttp_client import ( from homeassistant.helpers.aiohttp_client import async_aiohttp_proxy_stream
async_aiohttp_proxy_stream,
async_get_clientsession,
)
import homeassistant.util.dt as dt_util
from .base import ONVIFBaseEntity
from .const import ( from .const import (
ABSOLUTE_MOVE, ABSOLUTE_MOVE,
ATTR_CONTINUOUS_DURATION, ATTR_CONTINUOUS_DURATION,
@ -42,7 +23,6 @@ from .const import (
ATTR_SPEED, ATTR_SPEED,
ATTR_TILT, ATTR_TILT,
ATTR_ZOOM, ATTR_ZOOM,
CONF_PROFILE,
CONF_RTSP_TRANSPORT, CONF_RTSP_TRANSPORT,
CONTINUOUS_MOVE, CONTINUOUS_MOVE,
DIR_DOWN, DIR_DOWN,
@ -50,14 +30,10 @@ from .const import (
DIR_RIGHT, DIR_RIGHT,
DIR_UP, DIR_UP,
DOMAIN, DOMAIN,
ENTITIES,
GOTOPRESET_MOVE, GOTOPRESET_MOVE,
LOGGER, LOGGER,
PAN_FACTOR,
RELATIVE_MOVE, RELATIVE_MOVE,
SERVICE_PTZ, SERVICE_PTZ,
TILT_FACTOR,
ZOOM_FACTOR,
ZOOM_IN, ZOOM_IN,
ZOOM_OUT, ZOOM_OUT,
) )
@ -85,414 +61,98 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
"async_perform_ptz", "async_perform_ptz",
) )
base_config = { device = hass.data[DOMAIN][config_entry.unique_id]
CONF_NAME: config_entry.data[CONF_NAME], async_add_entities(
CONF_HOST: config_entry.data[CONF_HOST], [ONVIFCameraEntity(device, profile) for profile in device.profiles]
CONF_PORT: config_entry.data[CONF_PORT], )
CONF_USERNAME: config_entry.data[CONF_USERNAME],
CONF_PASSWORD: config_entry.data[CONF_PASSWORD],
CONF_EXTRA_ARGUMENTS: config_entry.options[CONF_EXTRA_ARGUMENTS],
CONF_RTSP_TRANSPORT: config_entry.options[CONF_RTSP_TRANSPORT],
}
entities = []
for profile in config_entry.data[CONF_PROFILE]:
config = {**base_config, CONF_PROFILE: profile}
camera = ONVIFHassCamera(hass, config)
await camera.async_initialize()
entities.append(camera)
async_add_entities(entities)
return True return True
class ONVIFHassCamera(Camera): class ONVIFCameraEntity(ONVIFBaseEntity, Camera):
"""An implementation of an ONVIF camera.""" """Representation of an ONVIF camera."""
def __init__(self, hass, config): def __init__(self, device, profile):
"""Initialize an ONVIF camera.""" """Initialize ONVIF camera entity."""
super().__init__() ONVIFBaseEntity.__init__(self, device, profile)
Camera.__init__(self)
LOGGER.debug("Importing dependencies") self.stream_options[CONF_RTSP_TRANSPORT] = device.config_entry.options.get(
CONF_RTSP_TRANSPORT
LOGGER.debug("Setting up the ONVIF camera component")
self._username = config.get(CONF_USERNAME)
self._password = config.get(CONF_PASSWORD)
self._host = config.get(CONF_HOST)
self._port = config.get(CONF_PORT)
self._name = config.get(CONF_NAME)
self._ffmpeg_arguments = config.get(CONF_EXTRA_ARGUMENTS)
self._profile_index = config.get(CONF_PROFILE)
self._profile_token = None
self._profile_name = None
self._ptz_service = None
self._input = None
self._snapshot = None
self.stream_options[CONF_RTSP_TRANSPORT] = config.get(CONF_RTSP_TRANSPORT)
self._manufacturer = None
self._model = None
self._firmware_version = None
self._mac = None
LOGGER.debug(
"Setting up the ONVIF camera device @ '%s:%s'", self._host, self._port
) )
self._stream_uri = None
self._snapshot_uri = None
session = async_get_clientsession(hass) @property
transport = AsyncTransport(None, session=session) def supported_features(self) -> int:
self._camera = ONVIFCamera( """Return supported features."""
self._host, return SUPPORT_STREAM
self._port,
self._username,
self._password,
f"{os.path.dirname(onvif.__file__)}/wsdl/",
transport=transport,
)
async def async_initialize(self): @property
""" def name(self) -> str:
Initialize the camera. """Return the name of this camera."""
return f"{self.device.name} - {self.profile.name}"
Initializes the camera by obtaining the input uri and connecting to @property
the camera. Also retrieves the ONVIF profiles. def unique_id(self) -> str:
""" """Return a unique ID."""
try: if self.profile.index:
LOGGER.debug("Updating service addresses") return f"{self.device.info.mac}_{self.profile.index}"
await self._camera.update_xaddrs() return self.device.info.mac
await self.async_obtain_device_info() @property
await self.async_obtain_mac_address() def entity_registry_enabled_default(self) -> bool:
await self.async_check_date_and_time() """Return if the entity should be enabled when first added to the entity registry."""
await self.async_obtain_profile_token() return self.device.max_resolution == self.profile.video.resolution.width
await self.async_obtain_input_uri()
await self.async_obtain_snapshot_uri() async def stream_source(self):
self.setup_ptz() """Return the stream source."""
except ClientConnectionError as err: if self._stream_uri is None:
LOGGER.warning( uri_no_auth = await self.device.async_get_stream_uri(self.profile)
"Couldn't connect to camera '%s', but will retry later. Error: %s", self._stream_uri = uri_no_auth.replace(
self._name, "rtsp://", f"rtsp://{self.device.username}:{self.device.password}@", 1
err,
) )
raise PlatformNotReady return self._stream_uri
except Fault as err:
LOGGER.error(
"Couldn't connect to camera '%s', please verify "
"that the credentials are correct. Error: %s",
self._name,
err,
)
async def async_obtain_device_info(self):
"""Obtain the MAC address of the camera to use as the unique ID."""
devicemgmt = self._camera.create_devicemgmt_service()
device_info = await devicemgmt.GetDeviceInformation()
self._manufacturer = device_info.Manufacturer
self._model = device_info.Model
self._firmware_version = device_info.FirmwareVersion
async def async_obtain_mac_address(self):
"""Obtain the MAC address of the camera to use as the unique ID."""
devicemgmt = self._camera.create_devicemgmt_service()
network_interfaces = await devicemgmt.GetNetworkInterfaces()
for interface in network_interfaces:
if interface.Enabled:
self._mac = interface.Info.HwAddress
async def async_check_date_and_time(self):
"""Warns if camera and system date not synced."""
LOGGER.debug("Setting up the ONVIF device management service")
devicemgmt = self._camera.create_devicemgmt_service()
LOGGER.debug("Retrieving current camera date/time")
try:
system_date = dt_util.utcnow()
device_time = await devicemgmt.GetSystemDateAndTime()
if not device_time:
LOGGER.debug(
"""Couldn't get camera '%s' date/time.
GetSystemDateAndTime() return null/empty""",
self._name,
)
return
if device_time.UTCDateTime:
tzone = dt_util.UTC
cdate = device_time.UTCDateTime
else:
tzone = (
dt_util.get_time_zone(device_time.TimeZone)
or dt_util.DEFAULT_TIME_ZONE
)
cdate = device_time.LocalDateTime
if cdate is None:
LOGGER.warning("Could not retrieve date/time on this camera")
else:
cam_date = dt.datetime(
cdate.Date.Year,
cdate.Date.Month,
cdate.Date.Day,
cdate.Time.Hour,
cdate.Time.Minute,
cdate.Time.Second,
0,
tzone,
)
cam_date_utc = cam_date.astimezone(dt_util.UTC)
LOGGER.debug("TimeZone for date/time: %s", tzone)
LOGGER.debug("Camera date/time: %s", cam_date)
LOGGER.debug("Camera date/time in UTC: %s", cam_date_utc)
LOGGER.debug("System date/time: %s", system_date)
dt_diff = cam_date - system_date
dt_diff_seconds = dt_diff.total_seconds()
if dt_diff_seconds > 5:
LOGGER.warning(
"The date/time on the camera (UTC) is '%s', "
"which is different from the system '%s', "
"this could lead to authentication issues",
cam_date_utc,
system_date,
)
except ServerDisconnectedError as err:
LOGGER.warning(
"Couldn't get camera '%s' date/time. Error: %s", self._name, err
)
async def async_obtain_profile_token(self):
"""Obtain profile token to use with requests."""
try:
media_service = self._camera.get_service("media")
profiles = await media_service.GetProfiles()
LOGGER.debug("Retrieved '%d' profiles", len(profiles))
if self._profile_index >= len(profiles):
LOGGER.warning(
"ONVIF Camera '%s' doesn't provide profile %d."
" Using the last profile.",
self._name,
self._profile_index,
)
self._profile_index = -1
LOGGER.debug("Using profile index '%d'", self._profile_index)
self._profile_token = profiles[self._profile_index].token
self._profile_name = profiles[self._profile_index].Name
except exceptions.ONVIFError as err:
LOGGER.error(
"Couldn't retrieve profile token of camera '%s'. Error: %s",
self._name,
err,
)
async def async_obtain_input_uri(self):
"""Set the input uri for the camera."""
LOGGER.debug(
"Connecting with ONVIF Camera: %s on port %s", self._host, self._port
)
try:
LOGGER.debug("Retrieving stream uri")
# Fix Onvif setup error on Goke GK7102 based IP camera
# where we need to recreate media_service #26781
media_service = self._camera.create_media_service()
req = media_service.create_type("GetStreamUri")
req.ProfileToken = self._profile_token
req.StreamSetup = {
"Stream": "RTP-Unicast",
"Transport": {"Protocol": "RTSP"},
}
stream_uri = await media_service.GetStreamUri(req)
uri_no_auth = stream_uri.Uri
uri_for_log = uri_no_auth.replace("rtsp://", "rtsp://<user>:<password>@", 1)
self._input = uri_no_auth.replace(
"rtsp://", f"rtsp://{self._username}:{self._password}@", 1
)
LOGGER.debug(
"ONVIF Camera Using the following URL for %s: %s",
self._name,
uri_for_log,
)
except exceptions.ONVIFError as err:
LOGGER.error("Couldn't setup camera '%s'. Error: %s", self._name, err)
async def async_obtain_snapshot_uri(self):
"""Set the snapshot uri for the camera."""
LOGGER.debug(
"Connecting with ONVIF Camera: %s on port %s", self._host, self._port
)
try:
LOGGER.debug("Retrieving snapshot uri")
# Fix Onvif setup error on Goke GK7102 based IP camera
# where we need to recreate media_service #26781
media_service = self._camera.create_media_service()
req = media_service.create_type("GetSnapshotUri")
req.ProfileToken = self._profile_token
try:
snapshot_uri = await media_service.GetSnapshotUri(req)
self._snapshot = snapshot_uri.Uri
except ServerDisconnectedError as err:
LOGGER.debug("Camera does not support GetSnapshotUri: %s", err)
LOGGER.debug(
"ONVIF Camera Using the following URL for %s snapshot: %s",
self._name,
self._snapshot,
)
except exceptions.ONVIFError as err:
LOGGER.error("Couldn't setup camera '%s'. Error: %s", self._name, err)
def setup_ptz(self):
"""Set up PTZ if available."""
LOGGER.debug("Setting up the ONVIF PTZ service")
if self._camera.get_service("ptz", create=False) is None:
LOGGER.debug("PTZ is not available")
else:
self._ptz_service = self._camera.create_ptz_service()
LOGGER.debug("Completed set up of the ONVIF camera component")
async def async_perform_ptz(
self,
distance,
speed,
move_mode,
continuous_duration,
preset,
pan=None,
tilt=None,
zoom=None,
):
"""Perform a PTZ action on the camera."""
if self._ptz_service is None:
LOGGER.warning("PTZ actions are not supported on camera '%s'", self._name)
return
pan_val = distance * PAN_FACTOR.get(pan, 0)
tilt_val = distance * TILT_FACTOR.get(tilt, 0)
zoom_val = distance * ZOOM_FACTOR.get(zoom, 0)
speed_val = speed
preset_val = preset
LOGGER.debug(
"Calling %s PTZ | Pan = %4.2f | Tilt = %4.2f | Zoom = %4.2f | Speed = %4.2f | Preset = %s",
move_mode,
pan_val,
tilt_val,
zoom_val,
speed_val,
preset_val,
)
try:
req = self._ptz_service.create_type(move_mode)
req.ProfileToken = self._profile_token
if move_mode == CONTINUOUS_MOVE:
req.Velocity = {
"PanTilt": {"x": pan_val, "y": tilt_val},
"Zoom": {"x": zoom_val},
}
await self._ptz_service.ContinuousMove(req)
await asyncio.sleep(continuous_duration)
req = self._ptz_service.create_type("Stop")
req.ProfileToken = self._profile_token
await self._ptz_service.Stop({"ProfileToken": req.ProfileToken})
elif move_mode == RELATIVE_MOVE:
req.Translation = {
"PanTilt": {"x": pan_val, "y": tilt_val},
"Zoom": {"x": zoom_val},
}
req.Speed = {
"PanTilt": {"x": speed_val, "y": speed_val},
"Zoom": {"x": speed_val},
}
await self._ptz_service.RelativeMove(req)
elif move_mode == ABSOLUTE_MOVE:
req.Position = {
"PanTilt": {"x": pan_val, "y": tilt_val},
"Zoom": {"x": zoom_val},
}
req.Speed = {
"PanTilt": {"x": speed_val, "y": speed_val},
"Zoom": {"x": speed_val},
}
await self._ptz_service.AbsoluteMove(req)
elif move_mode == GOTOPRESET_MOVE:
req.PresetToken = preset_val
req.Speed = {
"PanTilt": {"x": speed_val, "y": speed_val},
"Zoom": {"x": speed_val},
}
await self._ptz_service.GotoPreset(req)
except exceptions.ONVIFError as err:
if "Bad Request" in err.reason:
self._ptz_service = None
LOGGER.debug("Camera '%s' doesn't support PTZ.", self._name)
else:
LOGGER.error("Error trying to perform PTZ action: %s", err)
async def async_added_to_hass(self):
"""Handle entity addition to hass."""
LOGGER.debug("Camera '%s' added to hass", self._name)
if DOMAIN not in self.hass.data:
self.hass.data[DOMAIN] = {}
self.hass.data[DOMAIN][ENTITIES] = []
self.hass.data[DOMAIN][ENTITIES].append(self)
async def async_camera_image(self): async def async_camera_image(self):
"""Return a still image response from the camera.""" """Return a still image response from the camera."""
LOGGER.debug("Retrieving image from camera '%s'", self._name)
image = None image = None
if self._snapshot is not None: if self.device.capabilities.snapshot:
if self._snapshot_uri is None:
self._snapshot_uri = await self.device.async_get_snapshot_uri(
self.profile
)
auth = None auth = None
if self._username and self._password: if self.device.username and self.device.password:
auth = HTTPDigestAuth(self._username, self._password) auth = HTTPDigestAuth(self.device.username, self.device.password)
def fetch(): def fetch():
"""Read image from a URL.""" """Read image from a URL."""
try: try:
response = requests.get(self._snapshot, timeout=5, auth=auth) response = requests.get(self._snapshot_uri, timeout=5, auth=auth)
if response.status_code < 300: if response.status_code < 300:
return response.content return response.content
except requests.exceptions.RequestException as error: except requests.exceptions.RequestException as error:
LOGGER.error( LOGGER.error(
"Fetch snapshot image failed from %s, falling back to FFmpeg; %s", "Fetch snapshot image failed from %s, falling back to FFmpeg; %s",
self._name, self.device.name,
error, error,
) )
return None return None
image = await self.hass.async_add_job(fetch) image = await self.hass.async_add_executor_job(fetch)
if image is None: if image is None:
# Don't keep trying the snapshot URL
self._snapshot = None
ffmpeg = ImageFrame(self.hass.data[DATA_FFMPEG].binary, loop=self.hass.loop) ffmpeg = ImageFrame(self.hass.data[DATA_FFMPEG].binary, loop=self.hass.loop)
image = await asyncio.shield( image = await asyncio.shield(
ffmpeg.get_image( ffmpeg.get_image(
self._input, self._stream_uri,
output_format=IMAGE_JPEG, output_format=IMAGE_JPEG,
extra_cmd=self._ffmpeg_arguments, extra_cmd=self.device.config_entry.options.get(
CONF_EXTRA_ARGUMENTS
),
) )
) )
@ -500,12 +160,15 @@ class ONVIFHassCamera(Camera):
async def handle_async_mjpeg_stream(self, request): async def handle_async_mjpeg_stream(self, request):
"""Generate an HTTP MJPEG stream from the camera.""" """Generate an HTTP MJPEG stream from the camera."""
LOGGER.debug("Handling mjpeg stream from camera '%s'", self._name) LOGGER.debug("Handling mjpeg stream from camera '%s'", self.device.name)
ffmpeg_manager = self.hass.data[DATA_FFMPEG] ffmpeg_manager = self.hass.data[DATA_FFMPEG]
stream = CameraMjpeg(ffmpeg_manager.binary, loop=self.hass.loop) stream = CameraMjpeg(ffmpeg_manager.binary, loop=self.hass.loop)
await stream.open_camera(self._input, extra_cmd=self._ffmpeg_arguments) await stream.open_camera(
self._stream_uri,
extra_cmd=self.device.config_entry.options.get(CONF_EXTRA_ARGUMENTS),
)
try: try:
stream_reader = await stream.get_reader() stream_reader = await stream.get_reader()
@ -518,36 +181,26 @@ class ONVIFHassCamera(Camera):
finally: finally:
await stream.close() await stream.close()
@property async def async_perform_ptz(
def supported_features(self): self,
"""Return supported features.""" distance,
if self._input: speed,
return SUPPORT_STREAM move_mode,
return 0 continuous_duration,
preset,
async def stream_source(self): pan=None,
"""Return the stream source.""" tilt=None,
return self._input zoom=None,
) -> None:
@property """Perform a PTZ action on the camera."""
def name(self): await self.device.async_perform_ptz(
"""Return the name of this camera.""" self.profile,
return f"{self._name} - {self._profile_name}" distance,
speed,
@property move_mode,
def unique_id(self) -> Optional[str]: continuous_duration,
"""Return a unique ID.""" preset,
if self._profile_index: pan,
return f"{self._mac}_{self._profile_index}" tilt,
return self._mac zoom,
)
@property
def device_info(self):
"""Return a device description for device registry."""
return {
"identifiers": {(DOMAIN, self._mac)},
"name": self._name,
"manufacturer": self._manufacturer,
"model": self._model,
"sw_version": self._firmware_version,
}

View file

@ -1,16 +1,13 @@
"""Config flow for ONVIF.""" """Config flow for ONVIF."""
import os
from pprint import pformat from pprint import pformat
from typing import List from typing import List
from urllib.parse import urlparse from urllib.parse import urlparse
import onvif from onvif.exceptions import ONVIFError
from onvif import ONVIFCamera, exceptions
import voluptuous as vol import voluptuous as vol
from wsdiscovery.discovery import ThreadedWSDiscovery as WSDiscovery from wsdiscovery.discovery import ThreadedWSDiscovery as WSDiscovery
from wsdiscovery.scope import Scope from wsdiscovery.scope import Scope
from wsdiscovery.service import Service from wsdiscovery.service import Service
from zeep.asyncio import AsyncTransport
from zeep.exceptions import Fault from zeep.exceptions import Fault
from homeassistant import config_entries from homeassistant import config_entries
@ -23,12 +20,10 @@ from homeassistant.const import (
CONF_USERNAME, CONF_USERNAME,
) )
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
# pylint: disable=unused-import # pylint: disable=unused-import
from .const import ( from .const import (
CONF_DEVICE_ID, CONF_DEVICE_ID,
CONF_PROFILE,
CONF_RTSP_TRANSPORT, CONF_RTSP_TRANSPORT,
DEFAULT_ARGUMENTS, DEFAULT_ARGUMENTS,
DEFAULT_PORT, DEFAULT_PORT,
@ -36,6 +31,7 @@ from .const import (
LOGGER, LOGGER,
RTSP_TRANS_PROTOCOLS, RTSP_TRANS_PROTOCOLS,
) )
from .device import get_device
CONF_MANUAL_INPUT = "Manually configure ONVIF device" CONF_MANUAL_INPUT = "Manually configure ONVIF device"
@ -219,23 +215,21 @@ class OnvifFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
} }
) )
if not self.onvif_config.get(CONF_PROFILE): # Verify there is an H264 profile
self.onvif_config[CONF_PROFILE] = [] media_service = device.create_media_service()
media_service = device.create_media_service() profiles = await media_service.GetProfiles()
profiles = await media_service.GetProfiles() h264 = any(
LOGGER.debug("Media Profiles %s", pformat(profiles)) profile.VideoEncoderConfiguration.Encoding == "H264"
for key, profile in enumerate(profiles): for profile in profiles
if profile.VideoEncoderConfiguration.Encoding != "H264": )
continue
self.onvif_config[CONF_PROFILE].append(key)
if not self.onvif_config[CONF_PROFILE]: if not h264:
return self.async_abort(reason="no_h264") return self.async_abort(reason="no_h264")
title = f"{self.onvif_config[CONF_NAME]} - {self.device_id}" title = f"{self.onvif_config[CONF_NAME]} - {self.device_id}"
return self.async_create_entry(title=title, data=self.onvif_config) return self.async_create_entry(title=title, data=self.onvif_config)
except exceptions.ONVIFError as err: except ONVIFError as err:
LOGGER.error( LOGGER.error(
"Couldn't setup ONVIF device '%s'. Error: %s", "Couldn't setup ONVIF device '%s'. Error: %s",
self.onvif_config[CONF_NAME], self.onvif_config[CONF_NAME],
@ -292,17 +286,3 @@ class OnvifOptionsFlowHandler(config_entries.OptionsFlow):
} }
), ),
) )
def get_device(hass, host, port, username, password) -> ONVIFCamera:
"""Get ONVIFCamera instance."""
session = async_get_clientsession(hass)
transport = AsyncTransport(None, session=session)
return ONVIFCamera(
host,
port,
username,
password,
f"{os.path.dirname(onvif.__file__)}/wsdl/",
transport=transport,
)

View file

@ -4,18 +4,14 @@ import logging
LOGGER = logging.getLogger(__package__) LOGGER = logging.getLogger(__package__)
DOMAIN = "onvif" DOMAIN = "onvif"
ONVIF_DATA = "onvif"
ENTITIES = "entities"
DEFAULT_NAME = "ONVIF Camera" DEFAULT_NAME = "ONVIF Camera"
DEFAULT_PORT = 5000 DEFAULT_PORT = 5000
DEFAULT_USERNAME = "admin" DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "888888" DEFAULT_PASSWORD = "888888"
DEFAULT_ARGUMENTS = "-pred 1" DEFAULT_ARGUMENTS = "-pred 1"
DEFAULT_PROFILE = 0
CONF_DEVICE_ID = "deviceid" CONF_DEVICE_ID = "deviceid"
CONF_PROFILE = "profile"
CONF_RTSP_TRANSPORT = "rtsp_transport" CONF_RTSP_TRANSPORT = "rtsp_transport"
RTSP_TRANS_PROTOCOLS = ["tcp", "udp", "udp_multicast", "http"] RTSP_TRANS_PROTOCOLS = ["tcp", "udp", "udp_multicast", "http"]
@ -44,4 +40,3 @@ ABSOLUTE_MOVE = "AbsoluteMove"
GOTOPRESET_MOVE = "GotoPreset" GOTOPRESET_MOVE = "GotoPreset"
SERVICE_PTZ = "ptz" SERVICE_PTZ = "ptz"
ENTITIES = "entities"

View file

@ -0,0 +1,399 @@
"""ONVIF device abstraction."""
import asyncio
import datetime as dt
import os
from typing import List
from aiohttp.client_exceptions import ClientConnectionError, ServerDisconnectedError
import onvif
from onvif import ONVIFCamera
from onvif.exceptions import ONVIFError
from zeep.asyncio import AsyncTransport
from zeep.exceptions import Fault
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.util.dt as dt_util
from .const import (
ABSOLUTE_MOVE,
CONTINUOUS_MOVE,
GOTOPRESET_MOVE,
LOGGER,
PAN_FACTOR,
RELATIVE_MOVE,
TILT_FACTOR,
ZOOM_FACTOR,
)
from .models import PTZ, Capabilities, DeviceInfo, Profile, Resolution, Video
class ONVIFDevice:
"""Manages an ONVIF device."""
def __init__(self, hass, config_entry=None):
"""Initialize the device."""
self.hass = hass
self.config_entry = config_entry
self.available = True
self.device = None
self.info = DeviceInfo()
self.capabilities = Capabilities()
self.profiles = []
self.max_resolution = 0
@property
def name(self) -> str:
"""Return the name of this device."""
return self.config_entry.data[CONF_NAME]
@property
def host(self) -> str:
"""Return the host of this device."""
return self.config_entry.data[CONF_HOST]
@property
def port(self) -> int:
"""Return the port of this device."""
return self.config_entry.data[CONF_PORT]
@property
def username(self) -> int:
"""Return the username of this device."""
return self.config_entry.data[CONF_USERNAME]
@property
def password(self) -> int:
"""Return the password of this device."""
return self.config_entry.data[CONF_PASSWORD]
async def async_setup(self) -> bool:
"""Set up the device."""
self.device = get_device(
self.hass,
host=self.config_entry.data[CONF_HOST],
port=self.config_entry.data[CONF_PORT],
username=self.config_entry.data[CONF_USERNAME],
password=self.config_entry.data[CONF_PASSWORD],
)
# Get all device info
try:
await self.device.update_xaddrs()
await self.async_check_date_and_time()
self.info = await self.async_get_device_info()
self.capabilities = await self.async_get_capabilities()
self.profiles = await self.async_get_profiles()
if self.capabilities.ptz:
self.device.create_ptz_service()
# Determine max resolution from profiles
self.max_resolution = max(
profile.video.resolution.width
for profile in self.profiles
if profile.video.encoding == "H264"
)
except ClientConnectionError as err:
LOGGER.warning(
"Couldn't connect to camera '%s', but will retry later. Error: %s",
self.name,
err,
)
self.available = False
except Fault as err:
LOGGER.error(
"Couldn't connect to camera '%s', please verify "
"that the credentials are correct. Error: %s",
self.name,
err,
)
return False
return True
async def async_check_date_and_time(self) -> None:
"""Warns if device and system date not synced."""
LOGGER.debug("Setting up the ONVIF device management service")
devicemgmt = self.device.create_devicemgmt_service()
LOGGER.debug("Retrieving current device date/time")
try:
system_date = dt_util.utcnow()
device_time = await devicemgmt.GetSystemDateAndTime()
if not device_time:
LOGGER.debug(
"""Couldn't get device '%s' date/time.
GetSystemDateAndTime() return null/empty""",
self.name,
)
return
if device_time.UTCDateTime:
tzone = dt_util.UTC
cdate = device_time.UTCDateTime
else:
tzone = (
dt_util.get_time_zone(device_time.TimeZone)
or dt_util.DEFAULT_TIME_ZONE
)
cdate = device_time.LocalDateTime
if cdate is None:
LOGGER.warning("Could not retrieve date/time on this camera")
else:
cam_date = dt.datetime(
cdate.Date.Year,
cdate.Date.Month,
cdate.Date.Day,
cdate.Time.Hour,
cdate.Time.Minute,
cdate.Time.Second,
0,
tzone,
)
cam_date_utc = cam_date.astimezone(dt_util.UTC)
LOGGER.debug(
"Device date/time: %s | System date/time: %s",
cam_date_utc,
system_date,
)
dt_diff = cam_date - system_date
dt_diff_seconds = dt_diff.total_seconds()
if dt_diff_seconds > 5:
LOGGER.warning(
"The date/time on the device (UTC) is '%s', "
"which is different from the system '%s', "
"this could lead to authentication issues",
cam_date_utc,
system_date,
)
except ServerDisconnectedError as err:
LOGGER.warning(
"Couldn't get device '%s' date/time. Error: %s", self.name, err
)
async def async_get_device_info(self) -> DeviceInfo:
"""Obtain information about this device."""
devicemgmt = self.device.create_devicemgmt_service()
device_info = await devicemgmt.GetDeviceInformation()
return DeviceInfo(
device_info.Manufacturer,
device_info.Model,
device_info.FirmwareVersion,
self.config_entry.unique_id,
)
async def async_get_capabilities(self):
"""Obtain information about the available services on the device."""
media_service = self.device.create_media_service()
capabilities = await media_service.GetServiceCapabilities()
ptz = False
try:
self.device.get_definition("ptz")
ptz = True
except ONVIFError:
pass
return Capabilities(capabilities.SnapshotUri, ptz)
async def async_get_profiles(self) -> List[Profile]:
"""Obtain media profiles for this device."""
media_service = self.device.create_media_service()
result = await media_service.GetProfiles()
profiles = []
for key, onvif_profile in enumerate(result):
# Only add H264 profiles
if onvif_profile.VideoEncoderConfiguration.Encoding != "H264":
continue
profile = Profile(
key,
onvif_profile.token,
onvif_profile.Name,
Video(
onvif_profile.VideoEncoderConfiguration.Encoding,
Resolution(
onvif_profile.VideoEncoderConfiguration.Resolution.Width,
onvif_profile.VideoEncoderConfiguration.Resolution.Height,
),
),
)
# Configure PTZ options
if onvif_profile.PTZConfiguration:
profile.ptz = PTZ(
onvif_profile.PTZConfiguration.DefaultContinuousPanTiltVelocitySpace
is not None,
onvif_profile.PTZConfiguration.DefaultRelativePanTiltTranslationSpace
is not None,
onvif_profile.PTZConfiguration.DefaultAbsolutePantTiltPositionSpace
is not None,
)
ptz_service = self.device.get_service("ptz")
presets = await ptz_service.GetPresets(profile.token)
profile.ptz.presets = [preset.token for preset in presets]
profiles.append(profile)
return profiles
async def async_get_snapshot_uri(self, profile: Profile) -> str:
"""Get the snapshot URI for a specified profile."""
if not self.capabilities.snapshot:
return None
media_service = self.device.create_media_service()
req = media_service.create_type("GetSnapshotUri")
req.ProfileToken = profile.token
result = await media_service.GetSnapshotUri(req)
return result.Uri
async def async_get_stream_uri(self, profile: Profile) -> str:
"""Get the stream URI for a specified profile."""
media_service = self.device.create_media_service()
req = media_service.create_type("GetStreamUri")
req.ProfileToken = profile.token
req.StreamSetup = {
"Stream": "RTP-Unicast",
"Transport": {"Protocol": "RTSP"},
}
result = await media_service.GetStreamUri(req)
return result.Uri
async def async_perform_ptz(
self,
profile: Profile,
distance,
speed,
move_mode,
continuous_duration,
preset,
pan=None,
tilt=None,
zoom=None,
):
"""Perform a PTZ action on the camera."""
if not self.capabilities.ptz:
LOGGER.warning("PTZ actions are not supported on device '%s'", self.name)
return
ptz_service = self.device.get_service("ptz")
pan_val = distance * PAN_FACTOR.get(pan, 0)
tilt_val = distance * TILT_FACTOR.get(tilt, 0)
zoom_val = distance * ZOOM_FACTOR.get(zoom, 0)
speed_val = speed
preset_val = preset
LOGGER.debug(
"Calling %s PTZ | Pan = %4.2f | Tilt = %4.2f | Zoom = %4.2f | Speed = %4.2f | Preset = %s",
move_mode,
pan_val,
tilt_val,
zoom_val,
speed_val,
preset_val,
)
try:
req = ptz_service.create_type(move_mode)
req.ProfileToken = profile.token
if move_mode == CONTINUOUS_MOVE:
# Guard against unsupported operation
if not profile.ptz.continuous:
LOGGER.warning(
"ContinuousMove not supported on device '%s'", self.name
)
return
req.Velocity = {
"PanTilt": {"x": pan_val, "y": tilt_val},
"Zoom": {"x": zoom_val},
}
await ptz_service.ContinuousMove(req)
await asyncio.sleep(continuous_duration)
req = ptz_service.create_type("Stop")
req.ProfileToken = profile.token
await ptz_service.Stop({"ProfileToken": req.ProfileToken})
elif move_mode == RELATIVE_MOVE:
# Guard against unsupported operation
if not profile.ptz.relative:
LOGGER.warning(
"ContinuousMove not supported on device '%s'", self.name
)
return
req.Translation = {
"PanTilt": {"x": pan_val, "y": tilt_val},
"Zoom": {"x": zoom_val},
}
req.Speed = {
"PanTilt": {"x": speed_val, "y": speed_val},
"Zoom": {"x": speed_val},
}
await ptz_service.RelativeMove(req)
elif move_mode == ABSOLUTE_MOVE:
# Guard against unsupported operation
if not profile.ptz.absolute:
LOGGER.warning(
"ContinuousMove not supported on device '%s'", self.name
)
return
req.Position = {
"PanTilt": {"x": pan_val, "y": tilt_val},
"Zoom": {"x": zoom_val},
}
req.Speed = {
"PanTilt": {"x": speed_val, "y": speed_val},
"Zoom": {"x": speed_val},
}
await ptz_service.AbsoluteMove(req)
elif move_mode == GOTOPRESET_MOVE:
# Guard against unsupported operation
if preset_val not in profile.ptz.presets:
LOGGER.warning(
"PTZ preset '%s' does not exist on device '%s'. Available Presets: %s",
preset_val,
self.name,
profile.ptz.presets.join(", "),
)
return
req.PresetToken = preset_val
req.Speed = {
"PanTilt": {"x": speed_val, "y": speed_val},
"Zoom": {"x": speed_val},
}
await ptz_service.GotoPreset(req)
except ONVIFError as err:
if "Bad Request" in err.reason:
LOGGER.warning("Device '%s' doesn't support PTZ.", self.name)
else:
LOGGER.error("Error trying to perform PTZ action: %s", err)
def get_device(hass, host, port, username, password) -> ONVIFCamera:
"""Get ONVIFCamera instance."""
session = async_get_clientsession(hass)
transport = AsyncTransport(None, session=session)
return ONVIFCamera(
host,
port,
username,
password,
f"{os.path.dirname(onvif.__file__)}/wsdl/",
transport=transport,
)

View file

@ -0,0 +1,58 @@
"""ONVIF models."""
from dataclasses import dataclass
from typing import List
@dataclass
class DeviceInfo:
"""Represent device information."""
manufacturer: str = None
model: str = None
fw_version: str = None
mac: str = None
@dataclass
class Resolution:
"""Represent video resolution."""
width: int
height: int
@dataclass
class Video:
"""Represent video encoding settings."""
encoding: str
resolution: Resolution
@dataclass
class PTZ:
"""Represents PTZ configuration on a profile."""
continuous: bool
relative: bool
absolute: bool
presets: List[str] = None
@dataclass
class Profile:
"""Represent a ONVIF Profile."""
index: int
token: str
name: str
video: Video
ptz: PTZ = None
@dataclass
class Capabilities:
"""Represents Service capabilities."""
snapshot: bool = False
ptz: bool = False

View file

@ -36,10 +36,10 @@ DISCOVERY = [
] ]
def setup_mock_onvif_device( def setup_mock_onvif_camera(
mock_device, with_h264=True, two_profiles=False, with_interfaces=True mock_onvif_camera, with_h264=True, two_profiles=False, with_interfaces=True
): ):
"""Prepare mock ONVIF device.""" """Prepare mock onvif.ONVIFCamera."""
devicemgmt = MagicMock() devicemgmt = MagicMock()
interface = MagicMock() interface = MagicMock()
@ -61,10 +61,10 @@ def setup_mock_onvif_device(
media_service.GetProfiles.return_value = Future() media_service.GetProfiles.return_value = Future()
media_service.GetProfiles.return_value.set_result([profile1, profile2]) media_service.GetProfiles.return_value.set_result([profile1, profile2])
mock_device.update_xaddrs.return_value = Future() mock_onvif_camera.update_xaddrs.return_value = Future()
mock_device.update_xaddrs.return_value.set_result(True) mock_onvif_camera.update_xaddrs.return_value.set_result(True)
mock_device.create_devicemgmt_service = MagicMock(return_value=devicemgmt) mock_onvif_camera.create_devicemgmt_service = MagicMock(return_value=devicemgmt)
mock_device.create_media_service = MagicMock(return_value=media_service) mock_onvif_camera.create_media_service = MagicMock(return_value=media_service)
def mock_constructor( def mock_constructor(
host, host,
@ -78,9 +78,9 @@ def setup_mock_onvif_device(
transport=None, transport=None,
): ):
"""Fake the controller constructor.""" """Fake the controller constructor."""
return mock_device return mock_onvif_camera
mock_device.side_effect = mock_constructor mock_onvif_camera.side_effect = mock_constructor
def setup_mock_discovery( def setup_mock_discovery(
@ -114,16 +114,16 @@ def setup_mock_discovery(
mock_discovery.return_value = services mock_discovery.return_value = services
def setup_mock_camera(mock_camera): def setup_mock_device(mock_device):
"""Prepare mock HASS camera.""" """Prepare mock ONVIFDevice."""
mock_camera.async_initialize.return_value = Future() mock_device.async_setup.return_value = Future()
mock_camera.async_initialize.return_value.set_result(True) mock_device.async_setup.return_value.set_result(True)
def mock_constructor(hass, config): def mock_constructor(hass, config):
"""Fake the controller constructor.""" """Fake the controller constructor."""
return mock_camera return mock_device
mock_camera.side_effect = mock_constructor mock_device.side_effect = mock_constructor
async def setup_onvif_integration( async def setup_onvif_integration(
@ -137,7 +137,6 @@ async def setup_onvif_integration(
config_flow.CONF_PORT: PORT, config_flow.CONF_PORT: PORT,
config_flow.CONF_USERNAME: USERNAME, config_flow.CONF_USERNAME: USERNAME,
config_flow.CONF_PASSWORD: PASSWORD, config_flow.CONF_PASSWORD: PASSWORD,
config_flow.CONF_PROFILE: [0],
} }
config_entry = MockConfigEntry( config_entry = MockConfigEntry(
@ -153,15 +152,15 @@ async def setup_onvif_integration(
with patch( with patch(
"homeassistant.components.onvif.config_flow.get_device" "homeassistant.components.onvif.config_flow.get_device"
) as mock_device, patch( ) as mock_onvif_camera, patch(
"homeassistant.components.onvif.config_flow.wsdiscovery" "homeassistant.components.onvif.config_flow.wsdiscovery"
) as mock_discovery, patch( ) as mock_discovery, patch(
"homeassistant.components.onvif.camera.ONVIFHassCamera" "homeassistant.components.onvif.ONVIFDevice"
) as mock_camera: ) as mock_device:
setup_mock_onvif_device(mock_device, two_profiles=True) setup_mock_onvif_camera(mock_onvif_camera, two_profiles=True)
# no discovery # no discovery
mock_discovery.return_value = [] mock_discovery.return_value = []
setup_mock_camera(mock_camera) setup_mock_device(mock_device)
await hass.config_entries.async_setup(config_entry.entry_id) await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done() await hass.async_block_till_done()
return config_entry return config_entry
@ -179,14 +178,14 @@ async def test_flow_discovered_devices(hass):
with patch( with patch(
"homeassistant.components.onvif.config_flow.get_device" "homeassistant.components.onvif.config_flow.get_device"
) as mock_device, patch( ) as mock_onvif_camera, patch(
"homeassistant.components.onvif.config_flow.wsdiscovery" "homeassistant.components.onvif.config_flow.wsdiscovery"
) as mock_discovery, patch( ) as mock_discovery, patch(
"homeassistant.components.onvif.camera.ONVIFHassCamera" "homeassistant.components.onvif.ONVIFDevice"
) as mock_camera: ) as mock_device:
setup_mock_onvif_device(mock_device) setup_mock_onvif_camera(mock_onvif_camera)
setup_mock_discovery(mock_discovery) setup_mock_discovery(mock_discovery)
setup_mock_camera(mock_camera) setup_mock_device(mock_device)
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={} result["flow_id"], user_input={}
@ -221,7 +220,6 @@ async def test_flow_discovered_devices(hass):
config_flow.CONF_PORT: PORT, config_flow.CONF_PORT: PORT,
config_flow.CONF_USERNAME: USERNAME, config_flow.CONF_USERNAME: USERNAME,
config_flow.CONF_PASSWORD: PASSWORD, config_flow.CONF_PASSWORD: PASSWORD,
config_flow.CONF_PROFILE: [0],
} }
@ -238,14 +236,14 @@ async def test_flow_discovered_devices_ignore_configured_manual_input(hass):
with patch( with patch(
"homeassistant.components.onvif.config_flow.get_device" "homeassistant.components.onvif.config_flow.get_device"
) as mock_device, patch( ) as mock_onvif_camera, patch(
"homeassistant.components.onvif.config_flow.wsdiscovery" "homeassistant.components.onvif.config_flow.wsdiscovery"
) as mock_discovery, patch( ) as mock_discovery, patch(
"homeassistant.components.onvif.camera.ONVIFHassCamera" "homeassistant.components.onvif.ONVIFDevice"
) as mock_camera: ) as mock_device:
setup_mock_onvif_device(mock_device) setup_mock_onvif_camera(mock_onvif_camera)
setup_mock_discovery(mock_discovery, with_mac=True) setup_mock_discovery(mock_discovery, with_mac=True)
setup_mock_camera(mock_camera) setup_mock_device(mock_device)
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={} result["flow_id"], user_input={}
@ -289,14 +287,14 @@ async def test_flow_discovery_ignore_existing_and_abort(hass):
with patch( with patch(
"homeassistant.components.onvif.config_flow.get_device" "homeassistant.components.onvif.config_flow.get_device"
) as mock_device, patch( ) as mock_onvif_camera, patch(
"homeassistant.components.onvif.config_flow.wsdiscovery" "homeassistant.components.onvif.config_flow.wsdiscovery"
) as mock_discovery, patch( ) as mock_discovery, patch(
"homeassistant.components.onvif.camera.ONVIFHassCamera" "homeassistant.components.onvif.ONVIFDevice"
) as mock_camera: ) as mock_device:
setup_mock_onvif_device(mock_device) setup_mock_onvif_camera(mock_onvif_camera)
setup_mock_discovery(mock_discovery, with_name=True, with_mac=True) setup_mock_discovery(mock_discovery, with_name=True, with_mac=True)
setup_mock_camera(mock_camera) setup_mock_device(mock_device)
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={} result["flow_id"], user_input={}
@ -341,15 +339,15 @@ async def test_flow_manual_entry(hass):
with patch( with patch(
"homeassistant.components.onvif.config_flow.get_device" "homeassistant.components.onvif.config_flow.get_device"
) as mock_device, patch( ) as mock_onvif_camera, patch(
"homeassistant.components.onvif.config_flow.wsdiscovery" "homeassistant.components.onvif.config_flow.wsdiscovery"
) as mock_discovery, patch( ) as mock_discovery, patch(
"homeassistant.components.onvif.camera.ONVIFHassCamera" "homeassistant.components.onvif.ONVIFDevice"
) as mock_camera: ) as mock_device:
setup_mock_onvif_device(mock_device, two_profiles=True) setup_mock_onvif_camera(mock_onvif_camera, two_profiles=True)
# no discovery # no discovery
mock_discovery.return_value = [] mock_discovery.return_value = []
setup_mock_camera(mock_camera) setup_mock_device(mock_device)
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}, result["flow_id"], user_input={},
@ -388,14 +386,15 @@ async def test_flow_manual_entry(hass):
config_flow.CONF_PORT: PORT, config_flow.CONF_PORT: PORT,
config_flow.CONF_USERNAME: USERNAME, config_flow.CONF_USERNAME: USERNAME,
config_flow.CONF_PASSWORD: PASSWORD, config_flow.CONF_PASSWORD: PASSWORD,
config_flow.CONF_PROFILE: [0, 1],
} }
async def test_flow_import_no_mac(hass): async def test_flow_import_no_mac(hass):
"""Test that config flow fails when no MAC available.""" """Test that config flow fails when no MAC available."""
with patch("homeassistant.components.onvif.config_flow.get_device") as mock_device: with patch(
setup_mock_onvif_device(mock_device, with_interfaces=False) "homeassistant.components.onvif.config_flow.get_device"
) as mock_onvif_camera:
setup_mock_onvif_camera(mock_onvif_camera, with_interfaces=False)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, config_flow.DOMAIN,
@ -406,7 +405,6 @@ async def test_flow_import_no_mac(hass):
config_flow.CONF_PORT: PORT, config_flow.CONF_PORT: PORT,
config_flow.CONF_USERNAME: USERNAME, config_flow.CONF_USERNAME: USERNAME,
config_flow.CONF_PASSWORD: PASSWORD, config_flow.CONF_PASSWORD: PASSWORD,
config_flow.CONF_PROFILE: [0],
}, },
) )
@ -416,8 +414,10 @@ async def test_flow_import_no_mac(hass):
async def test_flow_import_no_h264(hass): async def test_flow_import_no_h264(hass):
"""Test that config flow fails when no MAC available.""" """Test that config flow fails when no MAC available."""
with patch("homeassistant.components.onvif.config_flow.get_device") as mock_device: with patch(
setup_mock_onvif_device(mock_device, with_h264=False) "homeassistant.components.onvif.config_flow.get_device"
) as mock_onvif_camera:
setup_mock_onvif_camera(mock_onvif_camera, with_h264=False)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, config_flow.DOMAIN,
@ -437,9 +437,11 @@ async def test_flow_import_no_h264(hass):
async def test_flow_import_onvif_api_error(hass): async def test_flow_import_onvif_api_error(hass):
"""Test that config flow fails when ONVIF API fails.""" """Test that config flow fails when ONVIF API fails."""
with patch("homeassistant.components.onvif.config_flow.get_device") as mock_device: with patch(
setup_mock_onvif_device(mock_device) "homeassistant.components.onvif.config_flow.get_device"
mock_device.create_devicemgmt_service = MagicMock( ) as mock_onvif_camera:
setup_mock_onvif_camera(mock_onvif_camera)
mock_onvif_camera.create_devicemgmt_service = MagicMock(
side_effect=ONVIFError("Could not get device mgmt service") side_effect=ONVIFError("Could not get device mgmt service")
) )
@ -461,9 +463,11 @@ async def test_flow_import_onvif_api_error(hass):
async def test_flow_import_onvif_auth_error(hass): async def test_flow_import_onvif_auth_error(hass):
"""Test that config flow fails when ONVIF API fails.""" """Test that config flow fails when ONVIF API fails."""
with patch("homeassistant.components.onvif.config_flow.get_device") as mock_device: with patch(
setup_mock_onvif_device(mock_device) "homeassistant.components.onvif.config_flow.get_device"
mock_device.create_devicemgmt_service = MagicMock( ) as mock_onvif_camera:
setup_mock_onvif_camera(mock_onvif_camera)
mock_onvif_camera.create_devicemgmt_service = MagicMock(
side_effect=Fault("Auth Error") side_effect=Fault("Auth Error")
) )