Change Amcrest event monitor to non-async (#69640)

This commit is contained in:
Sean Vig 2022-05-04 11:15:52 -04:00 committed by Franck Nijhof
parent 0890f4e514
commit aa0335408a
No known key found for this signature in database
GPG key ID: D62583BA8AB11CA3

View file

@ -7,6 +7,7 @@ from contextlib import asynccontextmanager, suppress
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
import threading
from typing import Any from typing import Any
import aiohttp import aiohttp
@ -30,15 +31,14 @@ from homeassistant.const import (
CONF_USERNAME, CONF_USERNAME,
ENTITY_MATCH_ALL, ENTITY_MATCH_ALL,
ENTITY_MATCH_NONE, ENTITY_MATCH_NONE,
EVENT_HOMEASSISTANT_STOP,
HTTP_BASIC_AUTHENTICATION, HTTP_BASIC_AUTHENTICATION,
Platform, Platform,
) )
from homeassistant.core import Event, HomeAssistant, ServiceCall, callback from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import Unauthorized, UnknownUser from homeassistant.exceptions import Unauthorized, UnknownUser
from homeassistant.helpers import discovery from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.dispatcher import async_dispatcher_send, dispatcher_send
from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.service import async_extract_entity_ids from homeassistant.helpers.service import async_extract_entity_ids
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
@ -144,10 +144,13 @@ class AmcrestChecker(ApiWrapper):
self._hass = hass self._hass = hass
self._wrap_name = name self._wrap_name = name
self._wrap_errors = 0 self._wrap_errors = 0
self._wrap_lock = asyncio.Lock() self._wrap_lock = threading.Lock()
self._async_wrap_lock = asyncio.Lock()
self._wrap_login_err = False self._wrap_login_err = False
self._wrap_event_flag = asyncio.Event() self._wrap_event_flag = threading.Event()
self._wrap_event_flag.set() self._wrap_event_flag.set()
self._async_wrap_event_flag = asyncio.Event()
self._async_wrap_event_flag.set()
self._unsub_recheck: Callable[[], None] | None = None self._unsub_recheck: Callable[[], None] | None = None
super().__init__( super().__init__(
host, host,
@ -164,12 +167,18 @@ class AmcrestChecker(ApiWrapper):
return self._wrap_errors <= MAX_ERRORS and not self._wrap_login_err return self._wrap_errors <= MAX_ERRORS and not self._wrap_login_err
@property @property
def available_flag(self) -> asyncio.Event: def available_flag(self) -> threading.Event:
"""Return event flag that indicates if camera's API is responding.""" """Return event flag that indicates if camera's API is responding."""
return self._wrap_event_flag return self._wrap_event_flag
@property
def async_available_flag(self) -> asyncio.Event:
"""Return event flag that indicates if camera's API is responding."""
return self._async_wrap_event_flag
def _start_recovery(self) -> None: def _start_recovery(self) -> None:
self._wrap_event_flag.clear() self.available_flag.clear()
self.async_available_flag.clear()
async_dispatcher_send( async_dispatcher_send(
self._hass, service_signal(SERVICE_UPDATE, self._wrap_name) self._hass, service_signal(SERVICE_UPDATE, self._wrap_name)
) )
@ -177,9 +186,22 @@ class AmcrestChecker(ApiWrapper):
self._hass, self._wrap_test_online, RECHECK_INTERVAL self._hass, self._wrap_test_online, RECHECK_INTERVAL
) )
def command(self, *args: Any, **kwargs: Any) -> Any:
"""amcrest.ApiWrapper.command wrapper to catch errors."""
try:
ret = super().command(*args, **kwargs)
except LoginError as ex:
self._handle_offline(ex)
raise
except AmcrestError:
self._handle_error()
raise
self._set_online()
return ret
async def async_command(self, *args: Any, **kwargs: Any) -> httpx.Response: async def async_command(self, *args: Any, **kwargs: Any) -> httpx.Response:
"""amcrest.ApiWrapper.command wrapper to catch errors.""" """amcrest.ApiWrapper.command wrapper to catch errors."""
async with self._command_wrapper(): async with self._async_command_wrapper():
ret = await super().async_command(*args, **kwargs) ret = await super().async_command(*args, **kwargs)
return ret return ret
@ -188,16 +210,27 @@ class AmcrestChecker(ApiWrapper):
self, *args: Any, **kwargs: Any self, *args: Any, **kwargs: Any
) -> AsyncIterator[httpx.Response]: ) -> AsyncIterator[httpx.Response]:
"""amcrest.ApiWrapper.command wrapper to catch errors.""" """amcrest.ApiWrapper.command wrapper to catch errors."""
async with self._command_wrapper(): async with self._async_command_wrapper():
async with super().async_stream_command(*args, **kwargs) as ret: async with super().async_stream_command(*args, **kwargs) as ret:
yield ret yield ret
@asynccontextmanager @asynccontextmanager
async def _command_wrapper(self) -> AsyncIterator[None]: async def _async_command_wrapper(self) -> AsyncIterator[None]:
try: try:
yield yield
except LoginError as ex: except LoginError as ex:
async with self._wrap_lock: async with self._async_wrap_lock:
self._handle_offline(ex)
raise
except AmcrestError:
async with self._async_wrap_lock:
self._handle_error()
raise
async with self._async_wrap_lock:
self._set_online()
def _handle_offline(self, ex: Exception) -> None:
with self._wrap_lock:
was_online = self.available was_online = self.available
was_login_err = self._wrap_login_err was_login_err = self._wrap_login_err
self._wrap_login_err = True self._wrap_login_err = True
@ -205,9 +238,9 @@ class AmcrestChecker(ApiWrapper):
_LOGGER.error("%s camera offline: Login error: %s", self._wrap_name, ex) _LOGGER.error("%s camera offline: Login error: %s", self._wrap_name, ex)
if was_online: if was_online:
self._start_recovery() self._start_recovery()
raise
except AmcrestError: def _handle_error(self) -> None:
async with self._wrap_lock: with self._wrap_lock:
was_online = self.available was_online = self.available
errs = self._wrap_errors = self._wrap_errors + 1 errs = self._wrap_errors = self._wrap_errors + 1
offline = not self.available offline = not self.available
@ -215,8 +248,9 @@ class AmcrestChecker(ApiWrapper):
if was_online and offline: if was_online and offline:
_LOGGER.error("%s camera offline: Too many errors", self._wrap_name) _LOGGER.error("%s camera offline: Too many errors", self._wrap_name)
self._start_recovery() self._start_recovery()
raise
async with self._wrap_lock: def _set_online(self) -> None:
with self._wrap_lock:
was_offline = not self.available was_offline = not self.available
self._wrap_errors = 0 self._wrap_errors = 0
self._wrap_login_err = False self._wrap_login_err = False
@ -225,7 +259,8 @@ class AmcrestChecker(ApiWrapper):
self._unsub_recheck() self._unsub_recheck()
self._unsub_recheck = None self._unsub_recheck = None
_LOGGER.error("%s camera back online", self._wrap_name) _LOGGER.error("%s camera back online", self._wrap_name)
self._wrap_event_flag.set() self.available_flag.set()
self.async_available_flag.set()
async_dispatcher_send( async_dispatcher_send(
self._hass, service_signal(SERVICE_UPDATE, self._wrap_name) self._hass, service_signal(SERVICE_UPDATE, self._wrap_name)
) )
@ -237,18 +272,18 @@ class AmcrestChecker(ApiWrapper):
await self.async_current_time await self.async_current_time
async def _monitor_events( def _monitor_events(
hass: HomeAssistant, hass: HomeAssistant,
name: str, name: str,
api: AmcrestChecker, api: AmcrestChecker,
event_codes: set[str], event_codes: set[str],
) -> None: ) -> None:
while True: while True:
await api.available_flag.wait() api.available_flag.wait()
try: try:
async for code, payload in api.async_event_actions("All"): for code, payload in api.event_actions("All"):
event_data = {"camera": name, "event": code, "payload": payload} event_data = {"camera": name, "event": code, "payload": payload}
hass.bus.async_fire("amcrest", event_data) hass.bus.fire("amcrest", event_data)
if code in event_codes: if code in event_codes:
signal = service_signal(SERVICE_EVENT, name, code) signal = service_signal(SERVICE_EVENT, name, code)
start = any( start = any(
@ -256,18 +291,32 @@ async def _monitor_events(
for key, val in payload.items() for key, val in payload.items()
) )
_LOGGER.debug("Sending signal: '%s': %s", signal, start) _LOGGER.debug("Sending signal: '%s': %s", signal, start)
async_dispatcher_send(hass, signal, start) dispatcher_send(hass, signal, start)
except AmcrestError as error: except AmcrestError as error:
_LOGGER.warning( _LOGGER.warning(
"Error while processing events from %s camera: %r", name, error "Error while processing events from %s camera: %r", name, error
) )
def _start_event_monitor(
hass: HomeAssistant,
name: str,
api: AmcrestChecker,
event_codes: set[str],
) -> None:
thread = threading.Thread(
target=_monitor_events,
name=f"Amcrest {name}",
args=(hass, name, api, event_codes),
daemon=True,
)
thread.start()
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Amcrest IP Camera component.""" """Set up the Amcrest IP Camera component."""
hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []}) hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []})
monitor_tasks = []
for device in config[DOMAIN]: for device in config[DOMAIN]:
name: str = device[CONF_NAME] name: str = device[CONF_NAME]
username: str = device[CONF_USERNAME] username: str = device[CONF_USERNAME]
@ -328,9 +377,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
and sensor.event_code is not None and sensor.event_code is not None
} }
monitor_tasks.append( _start_event_monitor(hass, name, api, event_codes)
asyncio.create_task(_monitor_events(hass, name, api, event_codes))
)
if sensors: if sensors:
hass.async_create_task( hass.async_create_task(
@ -354,13 +401,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
) )
) )
@callback
def cancel_monitors(event: Event) -> None:
for monitor_task in monitor_tasks:
monitor_task.cancel()
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, cancel_monitors)
if not hass.data[DATA_AMCREST][DEVICES]: if not hass.data[DATA_AMCREST][DEVICES]:
return False return False