Add type annotations to amcrest integration (#54761)

Co-authored-by: Milan Meulemans <milan.meulemans@live.be>
This commit is contained in:
Sean Vig 2021-08-25 07:24:29 -04:00 committed by GitHub
parent bb42eb1176
commit 6b4e3bca6f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 282 additions and 165 deletions

View file

@ -15,6 +15,7 @@ homeassistant.components.alarm_control_panel.*
homeassistant.components.amazon_polly.*
homeassistant.components.ambee.*
homeassistant.components.ambient_station.*
homeassistant.components.amcrest.*
homeassistant.components.ampio.*
homeassistant.components.automation.*
homeassistant.components.binary_sensor.*

View file

@ -1,13 +1,18 @@
"""Support for Amcrest IP cameras."""
from __future__ import annotations
from contextlib import suppress
from datetime import timedelta
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
import threading
from typing import Any, Callable
import aiohttp
from amcrest import AmcrestError, ApiWrapper, LoginError
import voluptuous as vol
from homeassistant.auth.models import User
from homeassistant.auth.permissions.const import POLICY_CONTROL
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR
from homeassistant.components.camera import DOMAIN as CAMERA
@ -27,12 +32,14 @@ from homeassistant.const import (
ENTITY_MATCH_NONE,
HTTP_BASIC_AUTHENTICATION,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import Unauthorized, UnknownUser
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send, dispatcher_send
from homeassistant.helpers.event import track_time_interval
from homeassistant.helpers.service import async_extract_entity_ids
from homeassistant.helpers.typing import ConfigType
from .binary_sensor import BINARY_SENSOR_KEYS, BINARY_SENSORS, check_binary_sensors
from .camera import CAMERA_SERVICES, STREAM_SOURCE_LIST
@ -73,7 +80,7 @@ SCAN_INTERVAL = timedelta(seconds=10)
AUTHENTICATION_LIST = {"basic": "basic"}
def _has_unique_names(devices):
def _has_unique_names(devices: list[dict[str, Any]]) -> list[dict[str, Any]]:
names = [device[CONF_NAME] for device in devices]
vol.Schema(vol.Unique())(names)
return devices
@ -119,7 +126,15 @@ CONFIG_SCHEMA = vol.Schema(
class AmcrestChecker(ApiWrapper):
"""amcrest.ApiWrapper wrapper for catching errors."""
def __init__(self, hass, name, host, port, user, password):
def __init__(
self,
hass: HomeAssistant,
name: str,
host: str,
port: int,
user: str,
password: str,
) -> None:
"""Initialize."""
self._hass = hass
self._wrap_name = name
@ -128,7 +143,7 @@ class AmcrestChecker(ApiWrapper):
self._wrap_login_err = False
self._wrap_event_flag = threading.Event()
self._wrap_event_flag.set()
self._unsub_recheck = None
self._unsub_recheck: Callable[[], None] | None = None
super().__init__(
host,
port,
@ -139,23 +154,23 @@ class AmcrestChecker(ApiWrapper):
)
@property
def available(self):
def available(self) -> bool:
"""Return if camera's API is responding."""
return self._wrap_errors <= MAX_ERRORS and not self._wrap_login_err
@property
def available_flag(self):
def available_flag(self) -> threading.Event:
"""Return threading event flag that indicates if camera's API is responding."""
return self._wrap_event_flag
def _start_recovery(self):
def _start_recovery(self) -> None:
self._wrap_event_flag.clear()
dispatcher_send(self._hass, service_signal(SERVICE_UPDATE, self._wrap_name))
self._unsub_recheck = track_time_interval(
self._hass, self._wrap_test_online, RECHECK_INTERVAL
)
def command(self, *args, **kwargs):
def command(self, *args: Any, **kwargs: Any) -> Any:
"""amcrest.ApiWrapper.command wrapper to catch errors."""
try:
ret = super().command(*args, **kwargs)
@ -184,6 +199,7 @@ class AmcrestChecker(ApiWrapper):
self._wrap_errors = 0
self._wrap_login_err = False
if was_offline:
assert self._unsub_recheck is not None
self._unsub_recheck()
self._unsub_recheck = None
_LOGGER.error("%s camera back online", self._wrap_name)
@ -191,15 +207,19 @@ class AmcrestChecker(ApiWrapper):
dispatcher_send(self._hass, service_signal(SERVICE_UPDATE, self._wrap_name))
return ret
def _wrap_test_online(self, now):
def _wrap_test_online(self, now: datetime) -> None:
"""Test if camera is back online."""
_LOGGER.debug("Testing if %s back online", self._wrap_name)
with suppress(AmcrestError):
self.current_time # pylint: disable=pointless-statement
def _monitor_events(hass, name, api, event_codes):
event_codes = set(event_codes)
def _monitor_events(
hass: HomeAssistant,
name: str,
api: AmcrestChecker,
event_codes: set[str],
) -> None:
while True:
api.available_flag.wait()
try:
@ -220,7 +240,12 @@ def _monitor_events(hass, name, api, event_codes):
)
def _start_event_monitor(hass, name, api, event_codes):
def _start_event_monitor(
hass: HomeAssistant,
name: str,
api: AmcrestChecker,
event_codes: set[str],
) -> None:
thread = threading.Thread(
target=_monitor_events,
name=f"Amcrest {name}",
@ -230,14 +255,14 @@ def _start_event_monitor(hass, name, api, event_codes):
thread.start()
def setup(hass, config):
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Amcrest IP Camera component."""
hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []})
for device in config[DOMAIN]:
name = device[CONF_NAME]
username = device[CONF_USERNAME]
password = device[CONF_PASSWORD]
name: str = device[CONF_NAME]
username: str = device[CONF_USERNAME]
password: str = device[CONF_PASSWORD]
api = AmcrestChecker(
hass, name, device[CONF_HOST], device[CONF_PORT], username, password
@ -253,7 +278,9 @@ def setup(hass, config):
# currently aiohttp only works with basic authentication
# only valid for mjpeg streaming
if device[CONF_AUTHENTICATION] == HTTP_BASIC_AUTHENTICATION:
authentication = aiohttp.BasicAuth(username, password)
authentication: aiohttp.BasicAuth | None = aiohttp.BasicAuth(
username, password
)
else:
authentication = None
@ -268,7 +295,7 @@ def setup(hass, config):
discovery.load_platform(hass, CAMERA, DOMAIN, {CONF_NAME: name}, config)
event_codes = []
event_codes = set()
if binary_sensors:
discovery.load_platform(
hass,
@ -277,11 +304,13 @@ def setup(hass, config):
{CONF_NAME: name, CONF_BINARY_SENSORS: binary_sensors},
config,
)
event_codes = [
event_codes = {
sensor.event_code
for sensor in BINARY_SENSORS
if sensor.key in binary_sensors and not sensor.should_poll
]
if sensor.key in binary_sensors
and not sensor.should_poll
and sensor.event_code is not None
}
_start_event_monitor(hass, name, api, event_codes)
@ -293,10 +322,10 @@ def setup(hass, config):
if not hass.data[DATA_AMCREST][DEVICES]:
return False
def have_permission(user, entity_id):
def have_permission(user: User | None, entity_id: str) -> bool:
return not user or user.permissions.check_entity(entity_id, POLICY_CONTROL)
async def async_extract_from_service(call):
async def async_extract_from_service(call: ServiceCall) -> list[str]:
if call.context.user_id:
user = await hass.auth.async_get_user(call.context.user_id)
if user is None:
@ -327,7 +356,7 @@ def setup(hass, config):
entity_ids.append(entity_id)
return entity_ids
async def async_service_handler(call):
async def async_service_handler(call: ServiceCall) -> None:
args = []
for arg in CAMERA_SERVICES[call.service][2]:
args.append(call.data[arg])
@ -340,22 +369,13 @@ def setup(hass, config):
return True
@dataclass
class AmcrestDevice:
"""Representation of a base Amcrest discovery device."""
def __init__(
self,
api,
authentication,
ffmpeg_arguments,
stream_source,
resolution,
control_light,
):
"""Initialize the entity."""
self.api = api
self.authentication = authentication
self.ffmpeg_arguments = ffmpeg_arguments
self.stream_source = stream_source
self.resolution = resolution
self.control_light = control_light
api: AmcrestChecker
authentication: aiohttp.BasicAuth | None
ffmpeg_arguments: list[str]
stream_source: str
resolution: int
control_light: bool

View file

@ -5,6 +5,7 @@ from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta
import logging
from typing import TYPE_CHECKING, Callable
from amcrest import AmcrestError
import voluptuous as vol
@ -17,8 +18,10 @@ from homeassistant.components.binary_sensor import (
BinarySensorEntityDescription,
)
from homeassistant.const import CONF_BINARY_SENSORS, CONF_NAME
from homeassistant.core import callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import Throttle
from .const import (
@ -30,6 +33,9 @@ from .const import (
)
from .helpers import log_update_error, service_signal
if TYPE_CHECKING:
from . import AmcrestDevice
@dataclass
class AmcrestSensorEntityDescription(BinarySensorEntityDescription):
@ -117,7 +123,7 @@ _EXCLUSIVE_OPTIONS = [
_UPDATE_MSG = "Updating %s binary sensor"
def check_binary_sensors(value):
def check_binary_sensors(value: list[str]) -> list[str]:
"""Validate binary sensor configurations."""
for exclusive_options in _EXCLUSIVE_OPTIONS:
if len(set(value) & exclusive_options) > 1:
@ -127,7 +133,12 @@ def check_binary_sensors(value):
return value
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up a binary sensor for an Amcrest IP Camera."""
if discovery_info is None:
return
@ -148,21 +159,27 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
class AmcrestBinarySensor(BinarySensorEntity):
"""Binary sensor for Amcrest camera."""
def __init__(self, name, device, entity_description):
def __init__(
self,
name: str,
device: AmcrestDevice,
entity_description: AmcrestSensorEntityDescription,
) -> None:
"""Initialize entity."""
self._signal_name = name
self._api = device.api
self.entity_description = entity_description
self.entity_description: AmcrestSensorEntityDescription = entity_description
self._attr_name = f"{name} {entity_description.name}"
self._attr_should_poll = entity_description.should_poll
self._unsub_dispatcher = []
self._unsub_dispatcher: list[Callable[[], None]] = []
@property
def available(self):
def available(self) -> bool:
"""Return True if entity is available."""
return self.entity_description.key == _ONLINE_KEY or self._api.available
def update(self):
def update(self) -> None:
"""Update entity."""
if self.entity_description.key == _ONLINE_KEY:
self._update_online()
@ -170,7 +187,7 @@ class AmcrestBinarySensor(BinarySensorEntity):
self._update_others()
@Throttle(_ONLINE_SCAN_INTERVAL)
def _update_online(self):
def _update_online(self) -> None:
if not (self._api.available or self.is_on):
return
_LOGGER.debug(_UPDATE_MSG, self.name)
@ -182,37 +199,41 @@ class AmcrestBinarySensor(BinarySensorEntity):
self._api.current_time # pylint: disable=pointless-statement
self._attr_is_on = self._api.available
def _update_others(self):
def _update_others(self) -> None:
if not self.available:
return
_LOGGER.debug(_UPDATE_MSG, self.name)
event_code = self.entity_description.event_code
if event_code is None:
_LOGGER.error("Binary sensor %s event code not set", self.name)
return
try:
self._attr_is_on = "channels" in self._api.event_channels_happened(
event_code
)
self._attr_is_on = len(self._api.event_channels_happened(event_code)) > 0
except AmcrestError as error:
log_update_error(_LOGGER, "update", self.name, "binary sensor", error)
async def async_on_demand_update(self):
async def async_on_demand_update(self) -> None:
"""Update state."""
if self.entity_description.key == _ONLINE_KEY:
_LOGGER.debug(_UPDATE_MSG, self.name)
self._attr_is_on = self._api.available
self.async_write_ha_state()
return
self.async_schedule_update_ha_state(True)
else:
self.async_schedule_update_ha_state(True)
@callback
def async_event_received(self, start):
def async_event_received(self, state: bool) -> None:
"""Update state from received event."""
_LOGGER.debug(_UPDATE_MSG, self.name)
self._attr_is_on = start
self._attr_is_on = state
self.async_write_ha_state()
async def async_added_to_hass(self):
async def async_added_to_hass(self) -> None:
"""Subscribe to signals."""
assert self.hass is not None
self._unsub_dispatcher.append(
async_dispatcher_connect(
self.hass,
@ -236,7 +257,7 @@ class AmcrestBinarySensor(BinarySensorEntity):
)
)
async def async_will_remove_from_hass(self):
async def async_will_remove_from_hass(self) -> None:
"""Disconnect from update signal."""
for unsub_dispatcher in self._unsub_dispatcher:
unsub_dispatcher()

View file

@ -5,14 +5,17 @@ import asyncio
from datetime import timedelta
from functools import partial
import logging
from typing import TYPE_CHECKING, Any, Callable
from aiohttp import web
from amcrest import AmcrestError
from haffmpeg.camera import CameraMjpeg
import voluptuous as vol
from homeassistant.components.camera import SUPPORT_ON_OFF, SUPPORT_STREAM, Camera
from homeassistant.components.ffmpeg import DATA_FFMPEG
from homeassistant.components.ffmpeg import DATA_FFMPEG, FFmpegManager
from homeassistant.const import ATTR_ENTITY_ID, CONF_NAME, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import (
async_aiohttp_proxy_stream,
async_aiohttp_proxy_web,
@ -20,6 +23,8 @@ from homeassistant.helpers.aiohttp_client import (
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import (
CAMERA_WEB_SESSION_TIMEOUT,
@ -32,6 +37,9 @@ from .const import (
)
from .helpers import log_update_error, service_signal
if TYPE_CHECKING:
from . import AmcrestDevice
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(seconds=15)
@ -112,7 +120,12 @@ CAMERA_SERVICES = {
_BOOL_TO_STATE = {True: STATE_ON, False: STATE_OFF}
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up an Amcrest IP Camera."""
if discovery_info is None:
return
@ -133,7 +146,7 @@ class AmcrestCommandFailed(Exception):
class AmcrestCam(Camera):
"""An implementation of an Amcrest IP camera."""
def __init__(self, name, device, ffmpeg):
def __init__(self, name: str, device: AmcrestDevice, ffmpeg: FFmpegManager) -> None:
"""Initialize an Amcrest camera."""
super().__init__()
self._name = name
@ -144,19 +157,19 @@ class AmcrestCam(Camera):
self._resolution = device.resolution
self._token = self._auth = device.authentication
self._control_light = device.control_light
self._is_recording = False
self._motion_detection_enabled = None
self._brand = None
self._model = None
self._audio_enabled = None
self._motion_recording_enabled = None
self._color_bw = None
self._rtsp_url = None
self._snapshot_task = None
self._unsub_dispatcher = []
self._is_recording: bool = False
self._motion_detection_enabled: bool = False
self._brand: str | None = None
self._model: str | None = None
self._audio_enabled: bool | None = None
self._motion_recording_enabled: bool | None = None
self._color_bw: str | None = None
self._rtsp_url: str | None = None
self._snapshot_task: asyncio.tasks.Task | None = None
self._unsub_dispatcher: list[Callable[[], None]] = []
self._update_succeeded = False
def _check_snapshot_ok(self):
def _check_snapshot_ok(self) -> None:
available = self.available
if not available or not self.is_on:
_LOGGER.warning(
@ -166,7 +179,8 @@ class AmcrestCam(Camera):
)
raise CannotSnapshot
async def _async_get_image(self):
async def _async_get_image(self) -> None:
assert self.hass is not None
try:
# Send the request to snap a picture and return raw jpg data
# Snapshot command needs a much longer read timeout than other commands.
@ -179,7 +193,7 @@ class AmcrestCam(Camera):
)
except AmcrestError as error:
log_update_error(_LOGGER, "get image from", self.name, "camera", error)
return None
return
finally:
self._snapshot_task = None
@ -187,6 +201,7 @@ class AmcrestCam(Camera):
self, width: int | None = None, height: int | None = None
) -> bytes | None:
"""Return a still image response from the camera."""
assert self.hass is not None
_LOGGER.debug("Take snapshot from %s", self._name)
try:
# Amcrest cameras only support one snapshot command at a time.
@ -207,8 +222,11 @@ class AmcrestCam(Camera):
except CannotSnapshot:
return None
async def handle_async_mjpeg_stream(self, request):
async def handle_async_mjpeg_stream(
self, request: web.Request
) -> web.StreamResponse | None:
"""Return an MJPEG stream."""
assert self.hass is not None
# The snapshot implementation is handled by the parent class
if self._stream_source == "snapshot":
return await super().handle_async_mjpeg_stream(request)
@ -232,7 +250,7 @@ class AmcrestCam(Camera):
return await async_aiohttp_proxy_web(self.hass, request, stream_coro)
# streaming via ffmpeg
assert self._rtsp_url is not None
streaming_url = self._rtsp_url
stream = CameraMjpeg(self._ffmpeg.binary)
await stream.open_camera(streaming_url, extra_cmd=self._ffmpeg_arguments)
@ -259,12 +277,12 @@ class AmcrestCam(Camera):
return True
@property
def name(self):
def name(self) -> str:
"""Return the name of this camera."""
return self._name
@property
def extra_state_attributes(self):
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the Amcrest-specific camera state attributes."""
attr = {}
if self._audio_enabled is not None:
@ -278,78 +296,80 @@ class AmcrestCam(Camera):
return attr
@property
def available(self):
def available(self) -> bool:
"""Return True if entity is available."""
return self._api.available
@property
def supported_features(self):
def supported_features(self) -> int:
"""Return supported features."""
return SUPPORT_ON_OFF | SUPPORT_STREAM
# Camera property overrides
@property
def is_recording(self):
def is_recording(self) -> bool:
"""Return true if the device is recording."""
return self._is_recording
@property
def brand(self):
def brand(self) -> str | None:
"""Return the camera brand."""
return self._brand
@property
def motion_detection_enabled(self):
def motion_detection_enabled(self) -> bool:
"""Return the camera motion detection status."""
return self._motion_detection_enabled
@property
def model(self):
def model(self) -> str | None:
"""Return the camera model."""
return self._model
async def stream_source(self):
async def stream_source(self) -> str | None:
"""Return the source of the stream."""
return self._rtsp_url
@property
def is_on(self):
def is_on(self) -> bool:
"""Return true if on."""
return self.is_streaming
# Other Entity method overrides
async def async_on_demand_update(self):
async def async_on_demand_update(self) -> None:
"""Update state."""
self.async_schedule_update_ha_state(True)
async def async_added_to_hass(self):
async def async_added_to_hass(self) -> None:
"""Subscribe to signals and add camera to list."""
for service, params in CAMERA_SERVICES.items():
self._unsub_dispatcher.append(
async_dispatcher_connect(
self.hass,
service_signal(service, self.entity_id),
getattr(self, params[1]),
)
assert self.hass is not None
self._unsub_dispatcher.extend(
async_dispatcher_connect(
self.hass,
service_signal(service, self.entity_id),
getattr(self, callback_name),
)
for service, (_, callback_name, _) in CAMERA_SERVICES.items()
)
self._unsub_dispatcher.append(
async_dispatcher_connect(
self.hass,
service_signal(SERVICE_UPDATE, self._name),
service_signal(SERVICE_UPDATE, self.name),
self.async_on_demand_update,
)
)
self.hass.data[DATA_AMCREST][CAMERAS].append(self.entity_id)
async def async_will_remove_from_hass(self):
async def async_will_remove_from_hass(self) -> None:
"""Remove camera from list and disconnect from signals."""
assert self.hass is not None
self.hass.data[DATA_AMCREST][CAMERAS].remove(self.entity_id)
for unsub_dispatcher in self._unsub_dispatcher:
unsub_dispatcher()
def update(self):
def update(self) -> None:
"""Update entity status."""
if not self.available or self._update_succeeded:
if not self.available:
@ -388,66 +408,77 @@ class AmcrestCam(Camera):
# Other Camera method overrides
def turn_off(self):
def turn_off(self) -> None:
"""Turn off camera."""
self._enable_video(False)
def turn_on(self):
def turn_on(self) -> None:
"""Turn on camera."""
self._enable_video(True)
def enable_motion_detection(self):
def enable_motion_detection(self) -> None:
"""Enable motion detection in the camera."""
self._enable_motion_detection(True)
def disable_motion_detection(self):
def disable_motion_detection(self) -> None:
"""Disable motion detection in camera."""
self._enable_motion_detection(False)
# Additional Amcrest Camera service methods
async def async_enable_recording(self):
async def async_enable_recording(self) -> None:
"""Call the job and enable recording."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._enable_recording, True)
async def async_disable_recording(self):
async def async_disable_recording(self) -> None:
"""Call the job and disable recording."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._enable_recording, False)
async def async_enable_audio(self):
async def async_enable_audio(self) -> None:
"""Call the job and enable audio."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._enable_audio, True)
async def async_disable_audio(self):
async def async_disable_audio(self) -> None:
"""Call the job and disable audio."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._enable_audio, False)
async def async_enable_motion_recording(self):
async def async_enable_motion_recording(self) -> None:
"""Call the job and enable motion recording."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._enable_motion_recording, True)
async def async_disable_motion_recording(self):
async def async_disable_motion_recording(self) -> None:
"""Call the job and disable motion recording."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._enable_motion_recording, False)
async def async_goto_preset(self, preset):
async def async_goto_preset(self, preset: int) -> None:
"""Call the job and move camera to preset position."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._goto_preset, preset)
async def async_set_color_bw(self, color_bw):
async def async_set_color_bw(self, color_bw: str) -> None:
"""Call the job and set camera color mode."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._set_color_bw, color_bw)
async def async_start_tour(self):
async def async_start_tour(self) -> None:
"""Call the job and start camera tour."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._start_tour, True)
async def async_stop_tour(self):
async def async_stop_tour(self) -> None:
"""Call the job and stop camera tour."""
assert self.hass is not None
await self.hass.async_add_executor_job(self._start_tour, False)
async def async_ptz_control(self, movement, travel_time):
async def async_ptz_control(self, movement: str, travel_time: float) -> None:
"""Move or zoom camera in specified direction."""
assert self.hass is not None
code = _ACTION[_MOV.index(movement)]
kwargs = {"code": code, "arg1": 0, "arg2": 0, "arg3": 0}
@ -471,11 +502,14 @@ class AmcrestCam(Camera):
# Methods to send commands to Amcrest camera and handle errors
def _change_setting(self, value, attr, description, action="set"):
def _change_setting(
self, value: str | bool, description: str, attr: str | None = None
) -> None:
func = description.replace(" ", "_")
description = f"camera {description} to {value}"
tries = 3
while True:
action = "set"
max_tries = 3
for tries in range(max_tries, 0, -1):
try:
getattr(self, f"_set_{func}")(value)
new_value = getattr(self, f"_get_{func}")()
@ -493,90 +527,94 @@ class AmcrestCam(Camera):
setattr(self, attr, new_value)
self.schedule_update_ha_state()
return
tries -= 1
def _get_video(self):
def _get_video(self) -> bool:
return self._api.video_enabled
def _set_video(self, enable):
def _set_video(self, enable: bool) -> None:
self._api.video_enabled = enable
def _enable_video(self, enable):
def _enable_video(self, enable: bool) -> None:
"""Enable or disable camera video stream."""
# Given the way the camera's state is determined by
# is_streaming and is_recording, we can't leave
# recording on if video stream is being turned off.
if self.is_recording and not enable:
self._enable_recording(False)
self._change_setting(enable, "is_streaming", "video")
self._change_setting(enable, "video", "is_streaming")
if self._control_light:
self._change_light()
def _get_recording(self):
def _get_recording(self) -> bool:
return self._api.record_mode == "Manual"
def _set_recording(self, enable):
def _set_recording(self, enable: bool) -> None:
rec_mode = {"Automatic": 0, "Manual": 1}
self._api.record_mode = rec_mode["Manual" if enable else "Automatic"]
# The property has a str type, but setter has int type, which causes mypy confusion
self._api.record_mode = rec_mode["Manual" if enable else "Automatic"] # type: ignore[assignment]
def _enable_recording(self, enable):
def _enable_recording(self, enable: bool) -> None:
"""Turn recording on or off."""
# Given the way the camera's state is determined by
# is_streaming and is_recording, we can't leave
# video stream off if recording is being turned on.
if not self.is_streaming and enable:
self._enable_video(True)
self._change_setting(enable, "_is_recording", "recording")
self._change_setting(enable, "recording", "_is_recording")
def _get_motion_detection(self):
def _get_motion_detection(self) -> bool:
return self._api.is_motion_detector_on()
def _set_motion_detection(self, enable):
self._api.motion_detection = str(enable).lower()
def _set_motion_detection(self, enable: bool) -> None:
# The property has a str type, but setter has bool type, which causes mypy confusion
self._api.motion_detection = enable # type: ignore[assignment]
def _enable_motion_detection(self, enable):
def _enable_motion_detection(self, enable: bool) -> None:
"""Enable or disable motion detection."""
self._change_setting(enable, "_motion_detection_enabled", "motion detection")
self._change_setting(enable, "motion detection", "_motion_detection_enabled")
def _get_audio(self):
def _get_audio(self) -> bool:
return self._api.audio_enabled
def _set_audio(self, enable):
def _set_audio(self, enable: bool) -> None:
self._api.audio_enabled = enable
def _enable_audio(self, enable):
def _enable_audio(self, enable: bool) -> None:
"""Enable or disable audio stream."""
self._change_setting(enable, "_audio_enabled", "audio")
self._change_setting(enable, "audio", "_audio_enabled")
if self._control_light:
self._change_light()
def _get_indicator_light(self):
return "true" in self._api.command(
"configManager.cgi?action=getConfig&name=LightGlobal"
).content.decode("utf-8")
def _get_indicator_light(self) -> bool:
return (
"true"
in self._api.command(
"configManager.cgi?action=getConfig&name=LightGlobal"
).content.decode()
)
def _set_indicator_light(self, enable):
def _set_indicator_light(self, enable: bool) -> None:
self._api.command(
f"configManager.cgi?action=setConfig&LightGlobal[0].Enable={str(enable).lower()}"
)
def _change_light(self):
def _change_light(self) -> None:
"""Enable or disable indicator light."""
self._change_setting(
self._audio_enabled or self.is_streaming, None, "indicator light"
self._audio_enabled or self.is_streaming, "indicator light"
)
def _get_motion_recording(self):
def _get_motion_recording(self) -> bool:
return self._api.is_record_on_motion_detection()
def _set_motion_recording(self, enable):
self._api.motion_recording = str(enable).lower()
def _set_motion_recording(self, enable: bool) -> None:
self._api.motion_recording = enable
def _enable_motion_recording(self, enable):
def _enable_motion_recording(self, enable: bool) -> None:
"""Enable or disable motion recording."""
self._change_setting(enable, "_motion_recording_enabled", "motion recording")
self._change_setting(enable, "motion recording", "_motion_recording_enabled")
def _goto_preset(self, preset):
def _goto_preset(self, preset: int) -> None:
"""Move camera position and zoom to preset."""
try:
self._api.go_to_preset(preset_point_number=preset)
@ -585,17 +623,17 @@ class AmcrestCam(Camera):
_LOGGER, "move", self.name, f"camera to preset {preset}", error
)
def _get_color_mode(self):
def _get_color_mode(self) -> str:
return _CBW[self._api.day_night_color]
def _set_color_mode(self, cbw):
def _set_color_mode(self, cbw: str) -> None:
self._api.day_night_color = _CBW.index(cbw)
def _set_color_bw(self, cbw):
def _set_color_bw(self, cbw: str) -> None:
"""Set camera color mode."""
self._change_setting(cbw, "_color_bw", "color mode")
self._change_setting(cbw, "color mode", "_color_bw")
def _start_tour(self, start):
def _start_tour(self, start: bool) -> None:
"""Start camera tour."""
try:
self._api.tour(start=start)

View file

@ -1,15 +1,24 @@
"""Helpers for amcrest component."""
from __future__ import annotations
import logging
from .const import DOMAIN
def service_signal(service, *args):
def service_signal(service: str, *args: str) -> str:
"""Encode signal."""
return "_".join([DOMAIN, service, *args])
def log_update_error(logger, action, name, entity_type, error, level=logging.ERROR):
def log_update_error(
logger: logging.Logger,
action: str,
name: str | None,
entity_type: str,
error: Exception,
level: int = logging.ERROR,
) -> None:
"""Log an update error."""
logger.log(
level,

View file

@ -3,16 +3,23 @@ from __future__ import annotations
from datetime import timedelta
import logging
from typing import TYPE_CHECKING, Callable
from amcrest import AmcrestError
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
from homeassistant.const import CONF_NAME, CONF_SENSORS, PERCENTAGE
from homeassistant.core import HomeAssistant
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import DATA_AMCREST, DEVICES, SENSOR_SCAN_INTERVAL_SECS, SERVICE_UPDATE
from .helpers import log_update_error, service_signal
if TYPE_CHECKING:
from . import AmcrestDevice
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(seconds=SENSOR_SCAN_INTERVAL_SECS)
@ -37,7 +44,12 @@ SENSOR_TYPES: tuple[SensorEntityDescription, ...] = (
SENSOR_KEYS: list[str] = [desc.key for desc in SENSOR_TYPES]
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up a sensor for an Amcrest IP Camera."""
if discovery_info is None:
return
@ -58,21 +70,24 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
class AmcrestSensor(SensorEntity):
"""A sensor implementation for Amcrest IP camera."""
def __init__(self, name, device, description: SensorEntityDescription):
def __init__(
self, name: str, device: AmcrestDevice, description: SensorEntityDescription
) -> None:
"""Initialize a sensor for Amcrest camera."""
self.entity_description = description
self._signal_name = name
self._api = device.api
self._unsub_dispatcher = None
self._unsub_dispatcher: Callable[[], None] | None = None
self._attr_name = f"{name} {description.name}"
self._attr_extra_state_attributes = {}
@property
def available(self):
def available(self) -> bool:
"""Return True if entity is available."""
return self._api.available
def update(self):
def update(self) -> None:
"""Get the latest data and updates the state."""
if not self.available:
return
@ -108,18 +123,20 @@ class AmcrestSensor(SensorEntity):
except AmcrestError as error:
log_update_error(_LOGGER, "update", self.name, "sensor", error)
async def async_on_demand_update(self):
async def async_on_demand_update(self) -> None:
"""Update state."""
self.async_schedule_update_ha_state(True)
async def async_added_to_hass(self):
async def async_added_to_hass(self) -> None:
"""Subscribe to update signal."""
assert self.hass is not None
self._unsub_dispatcher = async_dispatcher_connect(
self.hass,
service_signal(SERVICE_UPDATE, self._signal_name),
self.async_on_demand_update,
)
async def async_will_remove_from_hass(self):
async def async_will_remove_from_hass(self) -> None:
"""Disconnect from update signal."""
assert self._unsub_dispatcher is not None
self._unsub_dispatcher()

View file

@ -176,6 +176,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.amcrest.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.ampio.*]
check_untyped_defs = true
disallow_incomplete_defs = true