From 08770d015b5413bd56251f7376561148142ba5c2 Mon Sep 17 00:00:00 2001 From: Sean Vig Date: Wed, 4 May 2022 11:15:52 -0400 Subject: [PATCH] Change Amcrest event monitor to non-async (#69640) --- homeassistant/components/amcrest/__init__.py | 128 ++++++++++++------- 1 file changed, 84 insertions(+), 44 deletions(-) diff --git a/homeassistant/components/amcrest/__init__.py b/homeassistant/components/amcrest/__init__.py index f2472575259..e3f48263e8b 100644 --- a/homeassistant/components/amcrest/__init__.py +++ b/homeassistant/components/amcrest/__init__.py @@ -7,6 +7,7 @@ from contextlib import asynccontextmanager, suppress from dataclasses import dataclass from datetime import datetime, timedelta import logging +import threading from typing import Any import aiohttp @@ -30,15 +31,14 @@ from homeassistant.const import ( CONF_USERNAME, ENTITY_MATCH_ALL, ENTITY_MATCH_NONE, - EVENT_HOMEASSISTANT_STOP, HTTP_BASIC_AUTHENTICATION, Platform, ) -from homeassistant.core import Event, HomeAssistant, ServiceCall, callback +from homeassistant.core import HomeAssistant, ServiceCall from homeassistant.exceptions import Unauthorized, UnknownUser from homeassistant.helpers import discovery import homeassistant.helpers.config_validation as cv -from homeassistant.helpers.dispatcher import async_dispatcher_send +from homeassistant.helpers.dispatcher import async_dispatcher_send, dispatcher_send from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.service import async_extract_entity_ids from homeassistant.helpers.typing import ConfigType @@ -144,10 +144,13 @@ class AmcrestChecker(ApiWrapper): self._hass = hass self._wrap_name = name self._wrap_errors = 0 - self._wrap_lock = asyncio.Lock() + self._wrap_lock = threading.Lock() + self._async_wrap_lock = asyncio.Lock() self._wrap_login_err = False - self._wrap_event_flag = asyncio.Event() + self._wrap_event_flag = threading.Event() self._wrap_event_flag.set() + self._async_wrap_event_flag = asyncio.Event() + self._async_wrap_event_flag.set() self._unsub_recheck: Callable[[], None] | None = None super().__init__( host, @@ -164,12 +167,18 @@ class AmcrestChecker(ApiWrapper): return self._wrap_errors <= MAX_ERRORS and not self._wrap_login_err @property - def available_flag(self) -> asyncio.Event: + def available_flag(self) -> threading.Event: """Return event flag that indicates if camera's API is responding.""" return self._wrap_event_flag + @property + def async_available_flag(self) -> asyncio.Event: + """Return event flag that indicates if camera's API is responding.""" + return self._async_wrap_event_flag + def _start_recovery(self) -> None: - self._wrap_event_flag.clear() + self.available_flag.clear() + self.async_available_flag.clear() async_dispatcher_send( self._hass, service_signal(SERVICE_UPDATE, self._wrap_name) ) @@ -177,9 +186,22 @@ class AmcrestChecker(ApiWrapper): self._hass, self._wrap_test_online, RECHECK_INTERVAL ) + def command(self, *args: Any, **kwargs: Any) -> Any: + """amcrest.ApiWrapper.command wrapper to catch errors.""" + try: + ret = super().command(*args, **kwargs) + except LoginError as ex: + self._handle_offline(ex) + raise + except AmcrestError: + self._handle_error() + raise + self._set_online() + return ret + async def async_command(self, *args: Any, **kwargs: Any) -> httpx.Response: """amcrest.ApiWrapper.command wrapper to catch errors.""" - async with self._command_wrapper(): + async with self._async_command_wrapper(): ret = await super().async_command(*args, **kwargs) return ret @@ -188,35 +210,47 @@ class AmcrestChecker(ApiWrapper): self, *args: Any, **kwargs: Any ) -> AsyncIterator[httpx.Response]: """amcrest.ApiWrapper.command wrapper to catch errors.""" - async with self._command_wrapper(): + async with self._async_command_wrapper(): async with super().async_stream_command(*args, **kwargs) as ret: yield ret @asynccontextmanager - async def _command_wrapper(self) -> AsyncIterator[None]: + async def _async_command_wrapper(self) -> AsyncIterator[None]: try: yield except LoginError as ex: - async with self._wrap_lock: - was_online = self.available - was_login_err = self._wrap_login_err - self._wrap_login_err = True - if not was_login_err: - _LOGGER.error("%s camera offline: Login error: %s", self._wrap_name, ex) - if was_online: - self._start_recovery() + async with self._async_wrap_lock: + self._handle_offline(ex) raise except AmcrestError: - async with self._wrap_lock: - was_online = self.available - errs = self._wrap_errors = self._wrap_errors + 1 - offline = not self.available - _LOGGER.debug("%s camera errs: %i", self._wrap_name, errs) - if was_online and offline: - _LOGGER.error("%s camera offline: Too many errors", self._wrap_name) - self._start_recovery() + async with self._async_wrap_lock: + self._handle_error() raise - async with self._wrap_lock: + async with self._async_wrap_lock: + self._set_online() + + def _handle_offline(self, ex: Exception) -> None: + with self._wrap_lock: + was_online = self.available + was_login_err = self._wrap_login_err + self._wrap_login_err = True + if not was_login_err: + _LOGGER.error("%s camera offline: Login error: %s", self._wrap_name, ex) + if was_online: + self._start_recovery() + + def _handle_error(self) -> None: + with self._wrap_lock: + was_online = self.available + errs = self._wrap_errors = self._wrap_errors + 1 + offline = not self.available + _LOGGER.debug("%s camera errs: %i", self._wrap_name, errs) + if was_online and offline: + _LOGGER.error("%s camera offline: Too many errors", self._wrap_name) + self._start_recovery() + + def _set_online(self) -> None: + with self._wrap_lock: was_offline = not self.available self._wrap_errors = 0 self._wrap_login_err = False @@ -225,7 +259,8 @@ class AmcrestChecker(ApiWrapper): self._unsub_recheck() self._unsub_recheck = None _LOGGER.error("%s camera back online", self._wrap_name) - self._wrap_event_flag.set() + self.available_flag.set() + self.async_available_flag.set() async_dispatcher_send( self._hass, service_signal(SERVICE_UPDATE, self._wrap_name) ) @@ -237,18 +272,18 @@ class AmcrestChecker(ApiWrapper): await self.async_current_time -async def _monitor_events( +def _monitor_events( hass: HomeAssistant, name: str, api: AmcrestChecker, event_codes: set[str], ) -> None: while True: - await api.available_flag.wait() + api.available_flag.wait() try: - async for code, payload in api.async_event_actions("All"): + for code, payload in api.event_actions("All"): event_data = {"camera": name, "event": code, "payload": payload} - hass.bus.async_fire("amcrest", event_data) + hass.bus.fire("amcrest", event_data) if code in event_codes: signal = service_signal(SERVICE_EVENT, name, code) start = any( @@ -256,18 +291,32 @@ async def _monitor_events( for key, val in payload.items() ) _LOGGER.debug("Sending signal: '%s': %s", signal, start) - async_dispatcher_send(hass, signal, start) + dispatcher_send(hass, signal, start) except AmcrestError as error: _LOGGER.warning( "Error while processing events from %s camera: %r", name, error ) +def _start_event_monitor( + hass: HomeAssistant, + name: str, + api: AmcrestChecker, + event_codes: set[str], +) -> None: + thread = threading.Thread( + target=_monitor_events, + name=f"Amcrest {name}", + args=(hass, name, api, event_codes), + daemon=True, + ) + thread.start() + + async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the Amcrest IP Camera component.""" hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []}) - monitor_tasks = [] for device in config[DOMAIN]: name: str = device[CONF_NAME] username: str = device[CONF_USERNAME] @@ -328,9 +377,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: and sensor.event_code is not None } - monitor_tasks.append( - asyncio.create_task(_monitor_events(hass, name, api, event_codes)) - ) + _start_event_monitor(hass, name, api, event_codes) if sensors: hass.async_create_task( @@ -354,13 +401,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: ) ) - @callback - def cancel_monitors(event: Event) -> None: - for monitor_task in monitor_tasks: - monitor_task.cancel() - - hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, cancel_monitors) - if not hass.data[DATA_AMCREST][DEVICES]: return False