Improve reliability of ONVIF subscription renewals (#92551)

* Improve reliablity of onvif subscription renewals

upstream changelog: https://github.com/hunterjm/python-onvif-zeep-async/compare/v2.0.0...v2.1.0

* ```
Traceback (most recent call last):
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/onvif/client.py", line 75, in _async_wrap_connection_error_retry
    return await func(*args, **kwargs)
  File "/Users/bdraco/home-assistant/homeassistant/components/onvif/event.py", line 441, in _async_call_pullpoint_subscription_renew
    await self._pullpoint_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME)
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/zeep/proxy.py", line 64, in __call__
    return await self._proxy._binding.send_async(
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/zeep/wsdl/bindings/soap.py", line 156, in send_async
    response = await client.transport.post_xml(
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/zeep/transports.py", line 235, in post_xml
    response = await self.post(address, message, headers)
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/zeep/transports.py", line 220, in post
    response = await self.client.post(
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/httpx/_client.py", line 1845, in post
    return await self.request(
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/httpx/_client.py", line 1530, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/httpx/_client.py", line 1617, in send
    response = await self._send_handling_auth(
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/httpx/_client.py", line 1645, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/httpx/_client.py", line 1682, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/httpx/_client.py", line 1719, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/opt/homebrew/Cellar/python@3.10/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/bdraco/home-assistant/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout
```

* adjust timeouts for slower tplink cameras

* tweak

* more debug

* tweak

* adjust message

* tweak

* Revert "tweak"

This reverts commit 10ee2a8de7.

* give time in seconds

* revert

* revert

* Update homeassistant/components/onvif/event.py

* Update homeassistant/components/onvif/event.py
This commit is contained in:
J. Nick Koston 2023-05-05 13:26:58 -05:00 committed by GitHub
parent 53e533af6b
commit 9a589a3a54
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 80 deletions

View file

@ -9,7 +9,7 @@ import datetime as dt
from aiohttp.web import Request from aiohttp.web import Request
from httpx import RemoteProtocolError, RequestError, TransportError from httpx import RemoteProtocolError, RequestError, TransportError
from onvif import ONVIFCamera, ONVIFService from onvif import ONVIFCamera, ONVIFService
from onvif.client import NotificationManager from onvif.client import NotificationManager, retry_connection_error
from onvif.exceptions import ONVIFError from onvif.exceptions import ONVIFError
from zeep.exceptions import Fault, ValidationError, XMLParseError from zeep.exceptions import Fault, ValidationError, XMLParseError
@ -40,8 +40,8 @@ SET_SYNCHRONIZATION_POINT_ERRORS = (*SUBSCRIPTION_ERRORS, TypeError)
UNSUBSCRIBE_ERRORS = (XMLParseError, *SUBSCRIPTION_ERRORS) UNSUBSCRIBE_ERRORS = (XMLParseError, *SUBSCRIPTION_ERRORS)
RENEW_ERRORS = (ONVIFError, RequestError, XMLParseError, *SUBSCRIPTION_ERRORS) RENEW_ERRORS = (ONVIFError, RequestError, XMLParseError, *SUBSCRIPTION_ERRORS)
# #
# We only keep the subscription alive for 3 minutes, and will keep # We only keep the subscription alive for 10 minutes, and will keep
# renewing it every 1.5 minutes. This is to avoid the camera # renewing it every 8 minutes. This is to avoid the camera
# accumulating subscriptions which will be impossible to clean up # accumulating subscriptions which will be impossible to clean up
# since ONVIF does not provide a way to list existing subscriptions. # since ONVIF does not provide a way to list existing subscriptions.
# #
@ -49,12 +49,25 @@ RENEW_ERRORS = (ONVIFError, RequestError, XMLParseError, *SUBSCRIPTION_ERRORS)
# sending events to us, and we will not be able to recover until # sending events to us, and we will not be able to recover until
# the subscriptions expire or the camera is rebooted. # the subscriptions expire or the camera is rebooted.
# #
SUBSCRIPTION_TIME = dt.timedelta(minutes=3) SUBSCRIPTION_TIME = dt.timedelta(minutes=10)
SUBSCRIPTION_RELATIVE_TIME = (
"PT3M" # use relative time since the time on the camera is not reliable # SUBSCRIPTION_RELATIVE_TIME uses a relative time since the time on the camera
) # is not reliable. We use 600 seconds (10 minutes) since some cameras cannot
SUBSCRIPTION_RENEW_INTERVAL = SUBSCRIPTION_TIME.total_seconds() / 2 # parse time in the format "PT10M" (10 minutes).
SUBSCRIPTION_RENEW_INTERVAL_ON_ERROR = 60.0 SUBSCRIPTION_RELATIVE_TIME = "PT600S"
# SUBSCRIPTION_RENEW_INTERVAL Must be less than the
# overall timeout of 90 * (SUBSCRIPTION_ATTEMPTS) 2 = 180 seconds
#
# We use 8 minutes between renewals to make sure we never hit the
# 10 minute limit even if the first renewal attempt fails
SUBSCRIPTION_RENEW_INTERVAL = 8 * 60
# The number of attempts to make when creating or renewing a subscription
SUBSCRIPTION_ATTEMPTS = 2
# The time to wait before trying to restart the subscription if it fails
SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR = 60
PULLPOINT_POLL_TIME = dt.timedelta(seconds=60) PULLPOINT_POLL_TIME = dt.timedelta(seconds=60)
PULLPOINT_MESSAGE_LIMIT = 100 PULLPOINT_MESSAGE_LIMIT = 100
@ -327,20 +340,7 @@ class PullPointManager:
async def _async_start_pullpoint(self) -> bool: async def _async_start_pullpoint(self) -> bool:
"""Start pullpoint subscription.""" """Start pullpoint subscription."""
try: try:
try: started = await self._async_create_pullpoint_subscription()
started = await self._async_create_pullpoint_subscription()
except RequestError:
#
# We should only need to retry on RemoteProtocolError but some cameras
# are flaky and sometimes do not respond to the Renew request so we
# retry on RequestError as well.
#
# For RemoteProtocolError:
# http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
# to close the connection at any time, we treat this as a normal and try again
# once since we do not want to declare the camera as not supporting PullPoint
# if it just happened to close the connection at the wrong time.
started = await self._async_create_pullpoint_subscription()
except CREATE_ERRORS as err: except CREATE_ERRORS as err:
LOGGER.debug( LOGGER.debug(
"%s: Device does not support PullPoint service or has too many subscriptions: %s", "%s: Device does not support PullPoint service or has too many subscriptions: %s",
@ -372,16 +372,16 @@ class PullPointManager:
# scheduled when the current one is done if needed. # scheduled when the current one is done if needed.
return return
async with self._renew_lock: async with self._renew_lock:
next_attempt = SUBSCRIPTION_RENEW_INTERVAL_ON_ERROR next_attempt = SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR
try: try:
if ( if await self._async_renew_pullpoint():
await self._async_renew_pullpoint()
or await self._async_restart_pullpoint()
):
next_attempt = SUBSCRIPTION_RENEW_INTERVAL next_attempt = SUBSCRIPTION_RENEW_INTERVAL
else:
await self._async_restart_pullpoint()
finally: finally:
self.async_schedule_pullpoint_renew(next_attempt) self.async_schedule_pullpoint_renew(next_attempt)
@retry_connection_error(SUBSCRIPTION_ATTEMPTS)
async def _async_create_pullpoint_subscription(self) -> bool: async def _async_create_pullpoint_subscription(self) -> bool:
"""Create pullpoint subscription.""" """Create pullpoint subscription."""
@ -447,6 +447,11 @@ class PullPointManager:
) )
self._pullpoint_subscription = None self._pullpoint_subscription = None
@retry_connection_error(SUBSCRIPTION_ATTEMPTS)
async def _async_call_pullpoint_subscription_renew(self) -> None:
"""Call PullPoint subscription Renew."""
await self._pullpoint_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME)
async def _async_renew_pullpoint(self) -> bool: async def _async_renew_pullpoint(self) -> bool:
"""Renew the PullPoint subscription.""" """Renew the PullPoint subscription."""
if ( if (
@ -458,20 +463,7 @@ class PullPointManager:
# The first time we renew, we may get a Fault error so we # The first time we renew, we may get a Fault error so we
# suppress it. The subscription will be restarted in # suppress it. The subscription will be restarted in
# async_restart later. # async_restart later.
try: await self._async_call_pullpoint_subscription_renew()
await self._pullpoint_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME)
except RequestError:
#
# We should only need to retry on RemoteProtocolError but some cameras
# are flaky and sometimes do not respond to the Renew request so we
# retry on RequestError as well.
#
# For RemoteProtocolError:
# http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
# to close the connection at any time, we treat this as a normal and try again
# once since we do not want to mark events as stale
# if it just happened to close the connection at the wrong time.
await self._pullpoint_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME)
LOGGER.debug("%s: Renewed PullPoint subscription", self._name) LOGGER.debug("%s: Renewed PullPoint subscription", self._name)
return True return True
except RENEW_ERRORS as err: except RENEW_ERRORS as err:
@ -521,7 +513,7 @@ class PullPointManager:
stringify_onvif_error(err), stringify_onvif_error(err),
) )
return True return True
except (XMLParseError, *SUBSCRIPTION_ERRORS) as err: except Fault as err:
# Device may not support subscriptions so log at debug level # Device may not support subscriptions so log at debug level
# when we get an XMLParseError # when we get an XMLParseError
LOGGER.debug( LOGGER.debug(
@ -532,6 +524,16 @@ class PullPointManager:
# Treat errors as if the camera restarted. Assume that the pullpoint # Treat errors as if the camera restarted. Assume that the pullpoint
# subscription is no longer valid. # subscription is no longer valid.
return False return False
except (XMLParseError, RequestError, TimeoutError, TransportError) as err:
LOGGER.debug(
"%s: PullPoint subscription encountered an unexpected error and will be retried "
"(this is normal for some cameras): %s",
self._name,
stringify_onvif_error(err),
)
# Avoid renewing the subscription too often since it causes problems
# for some cameras, mainly the Tapo ones.
return True
if self.state != PullPointManagerState.STARTED: if self.state != PullPointManagerState.STARTED:
# If the webhook became started working during the long poll, # If the webhook became started working during the long poll,
@ -655,6 +657,7 @@ class WebHookManager:
self._renew_or_restart_job, self._renew_or_restart_job,
) )
@retry_connection_error(SUBSCRIPTION_ATTEMPTS)
async def _async_create_webhook_subscription(self) -> None: async def _async_create_webhook_subscription(self) -> None:
"""Create webhook subscription.""" """Create webhook subscription."""
LOGGER.debug( LOGGER.debug(
@ -689,20 +692,7 @@ class WebHookManager:
async def _async_start_webhook(self) -> bool: async def _async_start_webhook(self) -> bool:
"""Start webhook.""" """Start webhook."""
try: try:
try: await self._async_create_webhook_subscription()
await self._async_create_webhook_subscription()
except RequestError:
#
# We should only need to retry on RemoteProtocolError but some cameras
# are flaky and sometimes do not respond to the Renew request so we
# retry on RequestError as well.
#
# For RemoteProtocolError:
# http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
# to close the connection at any time, we treat this as a normal and try again
# once since we do not want to declare the camera as not supporting webhooks
# if it just happened to close the connection at the wrong time.
await self._async_create_webhook_subscription()
except CREATE_ERRORS as err: except CREATE_ERRORS as err:
self._event_manager.async_webhook_failed() self._event_manager.async_webhook_failed()
LOGGER.debug( LOGGER.debug(
@ -720,6 +710,12 @@ class WebHookManager:
await self._async_unsubscribe_webhook() await self._async_unsubscribe_webhook()
return await self._async_start_webhook() return await self._async_start_webhook()
@retry_connection_error(SUBSCRIPTION_ATTEMPTS)
async def _async_call_webhook_subscription_renew(self) -> None:
"""Call PullPoint subscription Renew."""
assert self._webhook_subscription is not None
await self._webhook_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME)
async def _async_renew_webhook(self) -> bool: async def _async_renew_webhook(self) -> bool:
"""Renew webhook subscription.""" """Renew webhook subscription."""
if ( if (
@ -728,20 +724,7 @@ class WebHookManager:
): ):
return False return False
try: try:
try: await self._async_call_webhook_subscription_renew()
await self._webhook_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME)
except RequestError:
#
# We should only need to retry on RemoteProtocolError but some cameras
# are flaky and sometimes do not respond to the Renew request so we
# retry on RequestError as well.
#
# For RemoteProtocolError:
# http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
# to close the connection at any time, we treat this as a normal and try again
# once since we do not want to mark events as stale
# if it just happened to close the connection at the wrong time.
await self._webhook_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME)
LOGGER.debug("%s: Renewed Webhook subscription", self._name) LOGGER.debug("%s: Renewed Webhook subscription", self._name)
return True return True
except RENEW_ERRORS as err: except RENEW_ERRORS as err:
@ -765,13 +748,12 @@ class WebHookManager:
# scheduled when the current one is done if needed. # scheduled when the current one is done if needed.
return return
async with self._renew_lock: async with self._renew_lock:
next_attempt = SUBSCRIPTION_RENEW_INTERVAL_ON_ERROR next_attempt = SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR
try: try:
if ( if await self._async_renew_webhook():
await self._async_renew_webhook()
or await self._async_restart_webhook()
):
next_attempt = SUBSCRIPTION_RENEW_INTERVAL next_attempt = SUBSCRIPTION_RENEW_INTERVAL
else:
await self._async_restart_webhook()
finally: finally:
self._async_schedule_webhook_renew(next_attempt) self._async_schedule_webhook_renew(next_attempt)

View file

@ -8,5 +8,5 @@
"documentation": "https://www.home-assistant.io/integrations/onvif", "documentation": "https://www.home-assistant.io/integrations/onvif",
"iot_class": "local_push", "iot_class": "local_push",
"loggers": ["onvif", "wsdiscovery", "zeep"], "loggers": ["onvif", "wsdiscovery", "zeep"],
"requirements": ["onvif-zeep-async==2.0.0", "WSDiscovery==2.0.0"] "requirements": ["onvif-zeep-async==2.1.1", "WSDiscovery==2.0.0"]
} }

View file

@ -34,7 +34,7 @@ def stringify_onvif_error(error: Exception) -> str:
message += f" (actor:{error.actor})" message += f" (actor:{error.actor})"
else: else:
message = str(error) message = str(error)
return message or "Device sent empty error" return message or f"Device sent empty error with type {type(error)}"
def is_auth_error(error: Exception) -> bool: def is_auth_error(error: Exception) -> bool:

View file

@ -1258,7 +1258,7 @@ ondilo==0.2.0
onkyo-eiscp==1.2.7 onkyo-eiscp==1.2.7
# homeassistant.components.onvif # homeassistant.components.onvif
onvif-zeep-async==2.0.0 onvif-zeep-async==2.1.1
# homeassistant.components.opengarage # homeassistant.components.opengarage
open-garage==0.2.0 open-garage==0.2.0

View file

@ -945,7 +945,7 @@ omnilogic==0.4.5
ondilo==0.2.0 ondilo==0.2.0
# homeassistant.components.onvif # homeassistant.components.onvif
onvif-zeep-async==2.0.0 onvif-zeep-async==2.1.1
# homeassistant.components.opengarage # homeassistant.components.opengarage
open-garage==0.2.0 open-garage==0.2.0