Change Amcrest event monitor to non-async (#69640)

This commit is contained in:
Sean Vig 2022-05-04 11:15:52 -04:00 committed by GitHub
parent d95113c8f2
commit 08770d015b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -7,6 +7,7 @@ from contextlib import asynccontextmanager, suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
import threading
from typing import Any
import aiohttp
@ -30,15 +31,14 @@ from homeassistant.const import (
CONF_USERNAME,
ENTITY_MATCH_ALL,
ENTITY_MATCH_NONE,
EVENT_HOMEASSISTANT_STOP,
HTTP_BASIC_AUTHENTICATION,
Platform,
)
from homeassistant.core import Event, HomeAssistant, ServiceCall, callback
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import Unauthorized, UnknownUser
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.dispatcher import async_dispatcher_send, dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.service import async_extract_entity_ids
from homeassistant.helpers.typing import ConfigType
@ -144,10 +144,13 @@ class AmcrestChecker(ApiWrapper):
self._hass = hass
self._wrap_name = name
self._wrap_errors = 0
self._wrap_lock = asyncio.Lock()
self._wrap_lock = threading.Lock()
self._async_wrap_lock = asyncio.Lock()
self._wrap_login_err = False
self._wrap_event_flag = asyncio.Event()
self._wrap_event_flag = threading.Event()
self._wrap_event_flag.set()
self._async_wrap_event_flag = asyncio.Event()
self._async_wrap_event_flag.set()
self._unsub_recheck: Callable[[], None] | None = None
super().__init__(
host,
@ -164,12 +167,18 @@ class AmcrestChecker(ApiWrapper):
return self._wrap_errors <= MAX_ERRORS and not self._wrap_login_err
@property
def available_flag(self) -> asyncio.Event:
def available_flag(self) -> threading.Event:
"""Return event flag that indicates if camera's API is responding."""
return self._wrap_event_flag
@property
def async_available_flag(self) -> asyncio.Event:
"""Return event flag that indicates if camera's API is responding."""
return self._async_wrap_event_flag
def _start_recovery(self) -> None:
self._wrap_event_flag.clear()
self.available_flag.clear()
self.async_available_flag.clear()
async_dispatcher_send(
self._hass, service_signal(SERVICE_UPDATE, self._wrap_name)
)
@ -177,9 +186,22 @@ class AmcrestChecker(ApiWrapper):
self._hass, self._wrap_test_online, RECHECK_INTERVAL
)
def command(self, *args: Any, **kwargs: Any) -> Any:
"""amcrest.ApiWrapper.command wrapper to catch errors."""
try:
ret = super().command(*args, **kwargs)
except LoginError as ex:
self._handle_offline(ex)
raise
except AmcrestError:
self._handle_error()
raise
self._set_online()
return ret
async def async_command(self, *args: Any, **kwargs: Any) -> httpx.Response:
"""amcrest.ApiWrapper.command wrapper to catch errors."""
async with self._command_wrapper():
async with self._async_command_wrapper():
ret = await super().async_command(*args, **kwargs)
return ret
@ -188,35 +210,47 @@ class AmcrestChecker(ApiWrapper):
self, *args: Any, **kwargs: Any
) -> AsyncIterator[httpx.Response]:
"""amcrest.ApiWrapper.command wrapper to catch errors."""
async with self._command_wrapper():
async with self._async_command_wrapper():
async with super().async_stream_command(*args, **kwargs) as ret:
yield ret
@asynccontextmanager
async def _command_wrapper(self) -> AsyncIterator[None]:
async def _async_command_wrapper(self) -> AsyncIterator[None]:
try:
yield
except LoginError as ex:
async with self._wrap_lock:
was_online = self.available
was_login_err = self._wrap_login_err
self._wrap_login_err = True
if not was_login_err:
_LOGGER.error("%s camera offline: Login error: %s", self._wrap_name, ex)
if was_online:
self._start_recovery()
async with self._async_wrap_lock:
self._handle_offline(ex)
raise
except AmcrestError:
async with self._wrap_lock:
was_online = self.available
errs = self._wrap_errors = self._wrap_errors + 1
offline = not self.available
_LOGGER.debug("%s camera errs: %i", self._wrap_name, errs)
if was_online and offline:
_LOGGER.error("%s camera offline: Too many errors", self._wrap_name)
self._start_recovery()
async with self._async_wrap_lock:
self._handle_error()
raise
async with self._wrap_lock:
async with self._async_wrap_lock:
self._set_online()
def _handle_offline(self, ex: Exception) -> None:
with self._wrap_lock:
was_online = self.available
was_login_err = self._wrap_login_err
self._wrap_login_err = True
if not was_login_err:
_LOGGER.error("%s camera offline: Login error: %s", self._wrap_name, ex)
if was_online:
self._start_recovery()
def _handle_error(self) -> None:
with self._wrap_lock:
was_online = self.available
errs = self._wrap_errors = self._wrap_errors + 1
offline = not self.available
_LOGGER.debug("%s camera errs: %i", self._wrap_name, errs)
if was_online and offline:
_LOGGER.error("%s camera offline: Too many errors", self._wrap_name)
self._start_recovery()
def _set_online(self) -> None:
with self._wrap_lock:
was_offline = not self.available
self._wrap_errors = 0
self._wrap_login_err = False
@ -225,7 +259,8 @@ class AmcrestChecker(ApiWrapper):
self._unsub_recheck()
self._unsub_recheck = None
_LOGGER.error("%s camera back online", self._wrap_name)
self._wrap_event_flag.set()
self.available_flag.set()
self.async_available_flag.set()
async_dispatcher_send(
self._hass, service_signal(SERVICE_UPDATE, self._wrap_name)
)
@ -237,18 +272,18 @@ class AmcrestChecker(ApiWrapper):
await self.async_current_time
async def _monitor_events(
def _monitor_events(
hass: HomeAssistant,
name: str,
api: AmcrestChecker,
event_codes: set[str],
) -> None:
while True:
await api.available_flag.wait()
api.available_flag.wait()
try:
async for code, payload in api.async_event_actions("All"):
for code, payload in api.event_actions("All"):
event_data = {"camera": name, "event": code, "payload": payload}
hass.bus.async_fire("amcrest", event_data)
hass.bus.fire("amcrest", event_data)
if code in event_codes:
signal = service_signal(SERVICE_EVENT, name, code)
start = any(
@ -256,18 +291,32 @@ async def _monitor_events(
for key, val in payload.items()
)
_LOGGER.debug("Sending signal: '%s': %s", signal, start)
async_dispatcher_send(hass, signal, start)
dispatcher_send(hass, signal, start)
except AmcrestError as error:
_LOGGER.warning(
"Error while processing events from %s camera: %r", name, error
)
def _start_event_monitor(
hass: HomeAssistant,
name: str,
api: AmcrestChecker,
event_codes: set[str],
) -> None:
thread = threading.Thread(
target=_monitor_events,
name=f"Amcrest {name}",
args=(hass, name, api, event_codes),
daemon=True,
)
thread.start()
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Amcrest IP Camera component."""
hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []})
monitor_tasks = []
for device in config[DOMAIN]:
name: str = device[CONF_NAME]
username: str = device[CONF_USERNAME]
@ -328,9 +377,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
and sensor.event_code is not None
}
monitor_tasks.append(
asyncio.create_task(_monitor_events(hass, name, api, event_codes))
)
_start_event_monitor(hass, name, api, event_codes)
if sensors:
hass.async_create_task(
@ -354,13 +401,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
)
)
@callback
def cancel_monitors(event: Event) -> None:
for monitor_task in monitor_tasks:
monitor_task.cancel()
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, cancel_monitors)
if not hass.data[DATA_AMCREST][DEVICES]:
return False