Add type annotations for Netatmo (#52811)

This commit is contained in:
Tobias Sauerwein 2021-07-21 23:36:57 +02:00 committed by GitHub
parent 84c482441d
commit 583deada83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 288 additions and 177 deletions

View file

@ -8,6 +8,7 @@ from datetime import timedelta
from itertools import islice
import logging
from time import time
from typing import Any
import pyatmo
@ -75,11 +76,11 @@ class NetatmoDataHandler:
self._auth = hass.data[DOMAIN][entry.entry_id][AUTH]
self.listeners: list[CALLBACK_TYPE] = []
self.data_classes: dict = {}
self.data = {}
self._queue = deque()
self.data: dict = {}
self._queue: deque = deque()
self._webhook: bool = False
async def async_setup(self):
async def async_setup(self) -> None:
"""Set up the Netatmo data handler."""
async_track_time_interval(
@ -94,7 +95,7 @@ class NetatmoDataHandler:
)
)
async def async_update(self, event_time):
async def async_update(self, event_time: timedelta) -> None:
"""
Update device.
@ -115,17 +116,17 @@ class NetatmoDataHandler:
self._queue.rotate(BATCH_SIZE)
@callback
def async_force_update(self, data_class_entry):
def async_force_update(self, data_class_entry: str) -> None:
"""Prioritize data retrieval for given data class entry."""
self.data_classes[data_class_entry].next_scan = time()
self._queue.rotate(-(self._queue.index(self.data_classes[data_class_entry])))
async def async_cleanup(self):
async def async_cleanup(self) -> None:
"""Clean up the Netatmo data handler."""
for listener in self.listeners:
listener()
async def handle_event(self, event):
async def handle_event(self, event: dict) -> None:
"""Handle webhook events."""
if event["data"][WEBHOOK_PUSH_TYPE] == WEBHOOK_ACTIVATION:
_LOGGER.info("%s webhook successfully registered", MANUFACTURER)
@ -139,7 +140,7 @@ class NetatmoDataHandler:
_LOGGER.debug("%s camera reconnected", MANUFACTURER)
self.async_force_update(CAMERA_DATA_CLASS_NAME)
async def async_fetch_data(self, data_class_entry):
async def async_fetch_data(self, data_class_entry: str) -> None:
"""Fetch data and notify."""
if self.data[data_class_entry] is None:
return
@ -163,8 +164,12 @@ class NetatmoDataHandler:
update_callback()
async def register_data_class(
self, data_class_name, data_class_entry, update_callback, **kwargs
):
self,
data_class_name: str,
data_class_entry: str,
update_callback: CALLBACK_TYPE,
**kwargs: Any,
) -> None:
"""Register data class."""
if data_class_entry in self.data_classes:
if update_callback not in self.data_classes[data_class_entry].subscriptions:
@ -189,7 +194,9 @@ class NetatmoDataHandler:
self._queue.append(self.data_classes[data_class_entry])
_LOGGER.debug("Data class %s added", data_class_entry)
async def unregister_data_class(self, data_class_entry, update_callback):
async def unregister_data_class(
self, data_class_entry: str, update_callback: CALLBACK_TYPE | None
) -> None:
"""Unregister data class."""
self.data_classes[data_class_entry].subscriptions.remove(update_callback)