Use PEP 526 type annotations, add some type hints (#26464)

* Add some more type hints to helpers.event

* Change most type comments to variable types

* Remove some superfluous type hints
This commit is contained in:
Ville Skyttä 2019-09-07 09:48:58 +03:00 committed by GitHub
parent 5b3004c7b0
commit 33e1b44b3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 152 additions and 174 deletions

View file

@ -20,7 +20,7 @@ TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN = "long_lived_access_token"
class Group:
"""A group."""
name = attr.ib(type=str) # type: Optional[str]
name = attr.ib(type=Optional[str])
policy = attr.ib(type=perm_mdl.PolicyType)
id = attr.ib(type=str, factory=lambda: uuid.uuid4().hex)
system_generated = attr.ib(type=bool, default=False)
@ -30,22 +30,20 @@ class Group:
class User:
"""A user."""
name = attr.ib(type=str) # type: Optional[str]
name = attr.ib(type=Optional[str])
perm_lookup = attr.ib(type=perm_mdl.PermissionLookup, cmp=False)
id = attr.ib(type=str, factory=lambda: uuid.uuid4().hex)
is_owner = attr.ib(type=bool, default=False)
is_active = attr.ib(type=bool, default=False)
system_generated = attr.ib(type=bool, default=False)
groups = attr.ib(type=List, factory=list, cmp=False) # type: List[Group]
groups = attr.ib(type=List[Group], factory=list, cmp=False)
# List of credentials of a user.
credentials = attr.ib(type=list, factory=list, cmp=False) # type: List[Credentials]
credentials = attr.ib(type=List["Credentials"], factory=list, cmp=False)
# Tokens associated with a user.
refresh_tokens = attr.ib(
type=dict, factory=dict, cmp=False
) # type: Dict[str, RefreshToken]
refresh_tokens = attr.ib(type=Dict[str, "RefreshToken"], factory=dict, cmp=False)
_permissions = attr.ib(
type=Optional[perm_mdl.PolicyPermissions], init=False, cmp=False, default=None

View file

@ -2,6 +2,7 @@
from collections import namedtuple
from datetime import timedelta
import logging
from typing import List
import voluptuous as vol
@ -41,12 +42,11 @@ class BboxDeviceScanner(DeviceScanner):
def __init__(self, config):
"""Get host from config."""
from typing import List # noqa: pylint: disable=unused-import
self.host = config[CONF_HOST]
"""Initialize the scanner."""
self.last_results = [] # type: List[Device]
self.last_results: List[Device] = []
self.success_init = self._update_info()
_LOGGER.info("Scanner initialized")

View file

@ -13,7 +13,7 @@ from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
BANDWIDTH_MEGABITS_SECONDS = "Mb/s" # type: str
BANDWIDTH_MEGABITS_SECONDS = "Mb/s"
ATTRIBUTION = "Powered by Bouygues Telecom"

View file

@ -81,13 +81,13 @@ class BuienradarCam(Camera):
# invariant: this condition is private to and owned by this instance.
self._condition = asyncio.Condition()
self._last_image = None # type: Optional[bytes]
self._last_image: Optional[bytes] = None
# value of the last seen last modified header
self._last_modified = None # type: Optional[str]
self._last_modified: Optional[str] = None
# loading status
self._loading = False
# deadline for image refresh - self.delta after last successful load
self._deadline = None # type: Optional[datetime]
self._deadline: Optional[datetime] = None
@property
def name(self) -> str:

View file

@ -525,32 +525,33 @@ class CastDevice(MediaPlayerDevice):
"elected leader" itself.
"""
def __init__(self, cast_info):
def __init__(self, cast_info: ChromecastInfo):
"""Initialize the cast device."""
import pychromecast # noqa: pylint: disable=unused-import
import pychromecast
self._cast_info = cast_info # type: ChromecastInfo
self._cast_info = cast_info
self.services = None
if cast_info.service:
self.services = set()
self.services.add(cast_info.service)
self._chromecast = None # type: Optional[pychromecast.Chromecast]
self._chromecast: Optional[pychromecast.Chromecast] = None
self.cast_status = None
self.media_status = None
self.media_status_received = None
self._dynamic_group_cast_info = None # type: ChromecastInfo
self._dynamic_group_cast = None # type: Optional[pychromecast.Chromecast]
self._dynamic_group_cast_info: ChromecastInfo = None
self._dynamic_group_cast: Optional[pychromecast.Chromecast] = None
self.dynamic_group_media_status = None
self.dynamic_group_media_status_received = None
self.mz_media_status = {}
self.mz_media_status_received = {}
self.mz_mgr = None
self._available = False # type: bool
self._dynamic_group_available = False # type: bool
self._status_listener = None # type: Optional[CastStatusListener]
self._dynamic_group_status_listener = (
None
) # type: Optional[DynamicGroupCastStatusListener]
self._available = False
self._dynamic_group_available = False
self._status_listener: Optional[CastStatusListener] = None
self._dynamic_group_status_listener: Optional[
DynamicGroupCastStatusListener
] = None
self._add_remove_handler = None
self._del_remove_handler = None

View file

@ -18,7 +18,7 @@ from .const import ATTR_SOURCE_TYPE, DOMAIN, LOGGER
async def async_setup_entry(hass, entry):
"""Set up an entry."""
component = hass.data.get(DOMAIN) # type: Optional[EntityComponent]
component: Optional[EntityComponent] = hass.data.get(DOMAIN)
if component is None:
component = hass.data[DOMAIN] = EntityComponent(LOGGER, DOMAIN, hass)

View file

@ -327,15 +327,15 @@ class DeviceTracker:
class Device(RestoreEntity):
"""Represent a tracked device."""
host_name = None # type: str
location_name = None # type: str
gps = None # type: GPSType
gps_accuracy = 0 # type: int
last_seen = None # type: dt_util.dt.datetime
consider_home = None # type: dt_util.dt.timedelta
battery = None # type: int
attributes = None # type: dict
icon = None # type: str
host_name: str = None
location_name: str = None
gps: GPSType = None
gps_accuracy: int = 0
last_seen: dt_util.dt.datetime = None
consider_home: dt_util.dt.timedelta = None
battery: int = None
attributes: dict = None
icon: str = None
# Track if the last update of this device was HOME.
last_update_home = False

View file

@ -147,7 +147,7 @@ def async_setup_scanner_platform(
scanner.hass = hass
# Initial scan of each mac we also tell about host name for config
seen = set() # type: Any
seen: Any = set()
async def async_device_tracker_scan(now: dt_util.dt.datetime):
"""Handle interval matches."""

View file

@ -26,10 +26,10 @@ from homeassistant.exceptions import PlatformNotReady
_LOGGER = logging.getLogger(__name__)
GIGABITS = "Gb" # type: str
PRICE = "CAD" # type: str
DAYS = "days" # type: str
PERCENT = "%" # type: str
GIGABITS = "Gb"
PRICE = "CAD"
DAYS = "days"
PERCENT = "%"
DEFAULT_NAME = "EBox"

View file

@ -203,7 +203,7 @@ async def _setup_auto_reconnect_logic(
# When removing/disconnecting manually
return
data = hass.data[DOMAIN][entry.entry_id] # type: RuntimeEntryData
data: RuntimeEntryData = hass.data[DOMAIN][entry.entry_id]
for disconnect_cb in data.disconnect_callbacks:
disconnect_cb()
data.disconnect_callbacks = []
@ -326,7 +326,7 @@ async def _cleanup_instance(
hass: HomeAssistantType, entry: ConfigEntry
) -> RuntimeEntryData:
"""Cleanup the esphome client if it exists."""
data = hass.data[DATA_KEY].pop(entry.entry_id) # type: RuntimeEntryData
data: RuntimeEntryData = hass.data[DATA_KEY].pop(entry.entry_id)
if data.reconnect_task is not None:
data.reconnect_task.cancel()
for disconnect_cb in data.disconnect_callbacks:
@ -363,7 +363,7 @@ async def platform_async_setup_entry(
This method is in charge of receiving, distributing and storing
info and state updates.
"""
entry_data = hass.data[DOMAIN][entry.entry_id] # type: RuntimeEntryData
entry_data: RuntimeEntryData = hass.data[DOMAIN][entry.entry_id]
entry_data.info[component_key] = {}
entry_data.state[component_key] = {}
@ -468,7 +468,7 @@ class EsphomeEntity(Entity):
self._entry_id = entry_id
self._component_key = component_key
self._key = key
self._remove_callbacks = [] # type: List[Callable[[], None]]
self._remove_callbacks: List[Callable[[], None]] = []
async def async_added_to_hass(self) -> None:
"""Register callbacks."""

View file

@ -19,9 +19,9 @@ class EsphomeFlowHandler(config_entries.ConfigFlow):
def __init__(self):
"""Initialize flow."""
self._host = None # type: Optional[str]
self._port = None # type: Optional[int]
self._password = None # type: Optional[str]
self._host: Optional[str] = None
self._port: Optional[int] = None
self._password: Optional[str] = None
async def async_step_user(
self, user_input: Optional[ConfigType] = None, error: Optional[str] = None
@ -94,9 +94,7 @@ class EsphomeFlowHandler(config_entries.ConfigFlow):
already_configured = True
elif entry.entry_id in self.hass.data.get(DATA_KEY, {}):
# Does a config entry with this name already exist?
data = self.hass.data[DATA_KEY][
entry.entry_id
] # type: RuntimeEntryData
data: RuntimeEntryData = self.hass.data[DATA_KEY][entry.entry_id]
# Node names are unique in the network
if data.device_info is not None:
already_configured = data.device_info.name == node_name

View file

@ -55,23 +55,21 @@ PROP_TO_ATTR = {
"speed_list": ATTR_SPEED_LIST,
"oscillating": ATTR_OSCILLATING,
"current_direction": ATTR_DIRECTION,
} # type: dict
}
FAN_SET_SPEED_SCHEMA = ENTITY_SERVICE_SCHEMA.extend(
{vol.Required(ATTR_SPEED): cv.string}
) # type: dict
)
FAN_TURN_ON_SCHEMA = ENTITY_SERVICE_SCHEMA.extend(
{vol.Optional(ATTR_SPEED): cv.string}
) # type: dict
FAN_TURN_ON_SCHEMA = ENTITY_SERVICE_SCHEMA.extend({vol.Optional(ATTR_SPEED): cv.string})
FAN_OSCILLATE_SCHEMA = ENTITY_SERVICE_SCHEMA.extend(
{vol.Required(ATTR_OSCILLATING): cv.boolean}
) # type: dict
)
FAN_SET_DIRECTION_SCHEMA = ENTITY_SERVICE_SCHEMA.extend(
{vol.Optional(ATTR_DIRECTION): cv.string}
) # type: dict
)
@bind_hass
@ -198,7 +196,7 @@ class FanEntity(ToggleEntity):
@property
def state_attributes(self) -> dict:
"""Return optional state attributes."""
data = {} # type: dict
data = {}
for prop, attr in PROP_TO_ATTR.items():
if not hasattr(self, prop):

View file

@ -25,10 +25,10 @@ import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
KILOBITS = "Kb" # type: str
PRICE = "CAD" # type: str
MESSAGES = "messages" # type: str
MINUTES = "minutes" # type: str
KILOBITS = "Kb"
PRICE = "CAD"
MESSAGES = "messages"
MINUTES = "minutes"
DEFAULT_NAME = "Fido"

View file

@ -162,11 +162,11 @@ class FinTsAccount(Entity):
def __init__(self, client: FinTsClient, account, name: str) -> None:
"""Initialize a FinTs balance account."""
self._client = client # type: FinTsClient
self._client = client
self._account = account
self._name = name # type: str
self._balance = None # type: float
self._currency = None # type: str
self._name = name
self._balance: float = None
self._currency: str = None
@property
def should_poll(self) -> bool:
@ -222,11 +222,11 @@ class FinTsHoldingsAccount(Entity):
def __init__(self, client: FinTsClient, account, name: str) -> None:
"""Initialize a FinTs holdings account."""
self._client = client # type: FinTsClient
self._name = name # type: str
self._client = client
self._name = name
self._account = account
self._holdings = []
self._total = None # type: float
self._total: float = None
@property
def should_poll(self) -> bool:

View file

@ -93,7 +93,7 @@ class GoogleAssistantView(HomeAssistantView):
async def post(self, request: Request) -> Response:
"""Handle Google Assistant requests."""
message = await request.json() # type: dict
message: dict = await request.json()
result = await async_handle_message(
request.app["hass"], self.config, request["hass_user"].id, message
)

View file

@ -24,7 +24,7 @@ _LOGGER = logging.getLogger(__name__)
async def async_handle_message(hass, config, user_id, message):
"""Handle incoming API messages."""
request_id = message.get("requestId") # type: str
request_id: str = message.get("requestId")
data = RequestData(config, user_id, request_id)
@ -38,7 +38,7 @@ async def async_handle_message(hass, config, user_id, message):
async def _process(hass, data, message):
"""Process a message."""
inputs = message.get("inputs") # type: list
inputs: list = message.get("inputs")
if len(inputs) != 1:
return {

View file

@ -75,19 +75,19 @@ class LightGroup(light.Light):
def __init__(self, name: str, entity_ids: List[str]) -> None:
"""Initialize a light group."""
self._name = name # type: str
self._entity_ids = entity_ids # type: List[str]
self._is_on = False # type: bool
self._available = False # type: bool
self._brightness = None # type: Optional[int]
self._hs_color = None # type: Optional[Tuple[float, float]]
self._color_temp = None # type: Optional[int]
self._min_mireds = 154 # type: Optional[int]
self._max_mireds = 500 # type: Optional[int]
self._white_value = None # type: Optional[int]
self._effect_list = None # type: Optional[List[str]]
self._effect = None # type: Optional[str]
self._supported_features = 0 # type: int
self._name = name
self._entity_ids = entity_ids
self._is_on = False
self._available = False
self._brightness: Optional[int] = None
self._hs_color: Optional[Tuple[float, float]] = None
self._color_temp: Optional[int] = None
self._min_mireds: Optional[int] = 154
self._max_mireds: Optional[int] = 500
self._white_value: Optional[int] = None
self._effect_list: Optional[List[str]] = None
self._effect: Optional[str] = None
self._supported_features: int = 0
self._async_unsub_state_changed = None
async def async_added_to_hass(self) -> None:

View file

@ -256,7 +256,7 @@ def get_next_departure(
_LOGGER.debug("Timetable: %s", sorted(timetable.keys()))
item = {} # type: dict
item = {}
for key in sorted(timetable.keys()):
if dt_util.parse_datetime(key) > now:
item = timetable[key]
@ -393,11 +393,11 @@ class GTFSDepartureSensor(Entity):
self._available = False
self._icon = ICON
self._name = ""
self._state = None # type: Optional[str]
self._attributes = {} # type: dict
self._state: Optional[str] = None
self._attributes = {}
self._agency = None
self._departure = {} # type: dict
self._departure = {}
self._destination = None
self._origin = None
self._route = None

View file

@ -28,9 +28,9 @@ import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
KILOWATT_HOUR = ENERGY_KILO_WATT_HOUR
PRICE = "CAD" # type: str
DAYS = "days" # type: str
CONF_CONTRACT = "contract" # type: str
PRICE = "CAD"
DAYS = "days"
CONF_CONTRACT = "contract"
DEFAULT_NAME = "HydroQuebec"

View file

@ -459,7 +459,7 @@ class ISYDevice(Entity):
"""Representation of an ISY994 device."""
_attrs = {}
_name = None # type: str
_name: str = None
def __init__(self, node) -> None:
"""Initialize the insteon device."""

View file

@ -10,7 +10,7 @@ import os
import socket
import ssl
import time
from typing import Any, Callable, List, Optional, Union, cast # noqa: F401
from typing import Any, Callable, List, Optional, Union
import attr
import requests.certs
@ -479,7 +479,7 @@ async def _async_setup_server(hass: HomeAssistantType, config: ConfigType):
This method is a coroutine.
"""
conf = config.get(DOMAIN, {}) # type: ConfigType
conf: ConfigType = config.get(DOMAIN, {})
success, broker_config = await server.async_start(
hass, conf.get(CONF_PASSWORD), conf.get(CONF_EMBEDDED)
@ -502,16 +502,16 @@ async def _async_setup_discovery(
_LOGGER.error("Unable to load MQTT discovery")
return False
success = await discovery.async_start(
success: bool = await discovery.async_start(
hass, conf[CONF_DISCOVERY_PREFIX], hass_config, config_entry
) # type: bool
)
return success
async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool:
"""Start the MQTT protocol service."""
conf = config.get(DOMAIN) # type: Optional[ConfigType]
conf: Optional[ConfigType] = config.get(DOMAIN)
# We need this because discovery can cause components to be set up and
# otherwise it will not load the users config.
@ -621,7 +621,7 @@ async def async_setup_entry(hass, entry):
birth_message = None
# Be able to override versions other than TLSv1.0 under Python3.6
conf_tls_version = conf.get(CONF_TLS_VERSION) # type: str
conf_tls_version: str = conf.get(CONF_TLS_VERSION)
if conf_tls_version == "1.2":
tls_version = ssl.PROTOCOL_TLSv1_2
elif conf_tls_version == "1.1":
@ -655,7 +655,7 @@ async def async_setup_entry(hass, entry):
tls_version=tls_version,
)
result = await hass.data[DATA_MQTT].async_connect() # type: str
result: str = await hass.data[DATA_MQTT].async_connect()
if result == CONNECTION_FAILED:
return False
@ -671,11 +671,11 @@ async def async_setup_entry(hass, entry):
async def async_publish_service(call: ServiceCall):
"""Handle MQTT publish service calls."""
msg_topic = call.data[ATTR_TOPIC] # type: str
msg_topic: str = call.data[ATTR_TOPIC]
payload = call.data.get(ATTR_PAYLOAD)
payload_template = call.data.get(ATTR_PAYLOAD_TEMPLATE)
qos = call.data[ATTR_QOS] # type: int
retain = call.data[ATTR_RETAIN] # type: bool
qos: int = call.data[ATTR_QOS]
retain: bool = call.data[ATTR_RETAIN]
if payload_template is not None:
try:
payload = template.Template(payload_template, hass).async_render()
@ -741,14 +741,14 @@ class MQTT:
self.broker = broker
self.port = port
self.keepalive = keepalive
self.subscriptions = [] # type: List[Subscription]
self.subscriptions: List[Subscription] = []
self.birth_message = birth_message
self.connected = False
self._mqttc = None # type: mqtt.Client
self._mqttc: mqtt.Client = None
self._paho_lock = asyncio.Lock()
if protocol == PROTOCOL_31:
proto = mqtt.MQTTv31 # type: int
proto: int = mqtt.MQTTv31
else:
proto = mqtt.MQTTv311
@ -796,7 +796,7 @@ class MQTT:
This method is a coroutine.
"""
result = None # type: int
result: int = None
try:
result = await self.hass.async_add_job(
self._mqttc.connect, self.broker, self.port, self.keepalive
@ -870,7 +870,7 @@ class MQTT:
This method is a coroutine.
"""
async with self._paho_lock:
result = None # type: int
result: int = None
result, _ = await self.hass.async_add_job(self._mqttc.unsubscribe, topic)
_raise_on_error(result)
@ -879,7 +879,7 @@ class MQTT:
_LOGGER.debug("Subscribing to %s", topic)
async with self._paho_lock:
result = None # type: int
result: int = None
result, _ = await self.hass.async_add_job(self._mqttc.subscribe, topic, qos)
_raise_on_error(result)
@ -928,7 +928,7 @@ class MQTT:
if not _match_topic(subscription.topic, msg.topic):
continue
payload = msg.payload # type: SubscribePayloadType
payload: SubscribePayloadType = msg.payload
if subscription.encoding is not None:
try:
payload = msg.payload.decode(subscription.encoding)
@ -1077,7 +1077,7 @@ class MqttAvailability(Entity):
def __init__(self, config: dict) -> None:
"""Initialize the availability mixin."""
self._availability_sub_state = None
self._available = False # type: bool
self._available = False
self._avail_config = config

View file

@ -1,8 +1,6 @@
"""Support for Onkyo Receivers."""
import logging
# pylint: disable=unused-import
from typing import List # noqa: F401
from typing import List
import voluptuous as vol
@ -54,7 +52,7 @@ SUPPORT_ONKYO_WO_VOLUME = (
| SUPPORT_PLAY_MEDIA
)
KNOWN_HOSTS = [] # type: List[str]
KNOWN_HOSTS: List[str] = []
DEFAULT_SOURCES = {
"tv": "TV",
"bd": "Bluray",

View file

@ -441,7 +441,7 @@ def ws_list_person(
hass: HomeAssistantType, connection: websocket_api.ActiveConnection, msg
):
"""List persons."""
manager = hass.data[DOMAIN] # type: PersonManager
manager: PersonManager = hass.data[DOMAIN]
connection.send_result(
msg["id"],
{"storage": manager.storage_persons, "config": manager.config_persons},
@ -464,7 +464,7 @@ async def ws_create_person(
hass: HomeAssistantType, connection: websocket_api.ActiveConnection, msg
):
"""Create a person."""
manager = hass.data[DOMAIN] # type: PersonManager
manager: PersonManager = hass.data[DOMAIN]
try:
person = await manager.async_create_person(
name=msg["name"],
@ -495,7 +495,7 @@ async def ws_update_person(
hass: HomeAssistantType, connection: websocket_api.ActiveConnection, msg
):
"""Update a person."""
manager = hass.data[DOMAIN] # type: PersonManager
manager: PersonManager = hass.data[DOMAIN]
changes = {}
for key in ("name", "user_id", "device_trackers"):
if key in msg:
@ -519,7 +519,7 @@ async def ws_delete_person(
hass: HomeAssistantType, connection: websocket_api.ActiveConnection, msg
):
"""Delete a person."""
manager = hass.data[DOMAIN] # type: PersonManager
manager: PersonManager = hass.data[DOMAIN]
await manager.async_delete_person(msg["person_id"])
connection.send_result(msg["id"])

View file

@ -211,8 +211,8 @@ class Proximity(Entity):
# Loop through each of the distances collected and work out the
# closest.
closest_device = None # type: str
dist_to_zone = None # type: float
closest_device: str = None
dist_to_zone: float = None
for device in distances_to_zone:
if not dist_to_zone or distances_to_zone[device] < dist_to_zone:

View file

@ -7,7 +7,7 @@ import logging
import queue
import threading
import time
from typing import Any, Dict, Optional # noqa: F401
from typing import Any, Dict, Optional
import voluptuous as vol
@ -177,12 +177,12 @@ class Recorder(threading.Thread):
self.hass = hass
self.keep_days = keep_days
self.purge_interval = purge_interval
self.queue = queue.Queue() # type: Any
self.queue: Any = queue.Queue()
self.recording_start = dt_util.utcnow()
self.db_url = uri
self.async_db_ready = asyncio.Future()
self.engine = None # type: Any
self.run_info = None # type: Any
self.engine: Any = None
self.run_info: Any = None
self.entity_filter = generate_filter(
include.get(CONF_DOMAINS, []),

View file

@ -18,8 +18,8 @@ _LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = "Start.ca"
CONF_TOTAL_BANDWIDTH = "total_bandwidth"
GIGABYTES = "GB" # type: str
PERCENT = "%" # type: str
GIGABYTES = "GB"
PERCENT = "%"
MIN_TIME_BETWEEN_UPDATES = timedelta(hours=1)
REQUEST_TIMEOUT = 5 # seconds

View file

@ -48,10 +48,10 @@ class LightSwitch(Light):
def __init__(self, name: str, switch_entity_id: str) -> None:
"""Initialize Light Switch."""
self._name = name # type: str
self._switch_entity_id = switch_entity_id # type: str
self._is_on = False # type: bool
self._available = False # type: bool
self._name = name
self._switch_entity_id = switch_entity_id
self._is_on = False
self._available = False
self._async_unsub_state_changed = None
@property

View file

@ -143,7 +143,7 @@ class SwitcherControl(SwitchDevice):
STATE_ON as SWITCHER_STATE_ON,
)
response = None # type: SwitcherV2ControlResponseMSG
response: "SwitcherV2ControlResponseMSG" = None
async with SwitcherV2Api(
self.hass.loop,
self._device_data.ip_addr,

View file

@ -17,8 +17,8 @@ _LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = "TekSavvy"
CONF_TOTAL_BANDWIDTH = "total_bandwidth"
GIGABYTES = "GB" # type: str
PERCENT = "%" # type: str
GIGABYTES = "GB"
PERCENT = "%"
MIN_TIME_BETWEEN_UPDATES = timedelta(hours=1)
REQUEST_TIMEOUT = 5 # seconds

View file

@ -3,7 +3,7 @@ import asyncio
from datetime import timedelta
import logging
from urllib.parse import urlparse
from typing import Dict # noqa: F401 pylint: disable=unused-import
from typing import Dict
import voluptuous as vol
@ -36,7 +36,7 @@ from homeassistant.const import (
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.script import Script
_CONFIGURING = {} # type: Dict[str, str]
_CONFIGURING: Dict[str, str] = {}
_LOGGER = logging.getLogger(__name__)
CONF_SOURCES = "sources"

View file

@ -2,7 +2,7 @@
# pylint: disable=W0611
from collections import OrderedDict
import logging
from typing import MutableMapping # noqa: F401
from typing import MutableMapping
from typing import cast
import attr
@ -35,7 +35,7 @@ class ZhaDeviceStorage:
def __init__(self, hass: HomeAssistantType) -> None:
"""Initialize the zha device storage."""
self.hass = hass
self.devices = {} # type: MutableMapping[str, ZhaDeviceEntry]
self.devices: MutableMapping[str, ZhaDeviceEntry] = {}
self._store = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY)
@callback
@ -88,7 +88,7 @@ class ZhaDeviceStorage:
"""Load the registry of zha device entries."""
data = await self._store.async_load()
devices = OrderedDict() # type: OrderedDict[str, ZhaDeviceEntry]
devices: "OrderedDict[str, ZhaDeviceEntry]" = OrderedDict()
if data is not None:
for device in data["devices"]:

View file

@ -53,7 +53,7 @@ class RegistryEntry:
device_id = attr.ib(type=str, default=None)
config_entry_id = attr.ib(type=str, default=None)
disabled_by = attr.ib(
type=str,
type=Optional[str],
default=None,
validator=attr.validators.in_(
(
@ -64,7 +64,7 @@ class RegistryEntry:
None,
)
),
) # type: Optional[str]
)
domain = attr.ib(type=str, init=False, repr=False)
@domain.default

View file

@ -1,5 +1,5 @@
"""Helpers for listening to events."""
from datetime import timedelta
from datetime import datetime, timedelta
import functools as ft
from typing import Callable
@ -21,8 +21,7 @@ from homeassistant.util import dt as dt_util
from homeassistant.util.async_ import run_callback_threadsafe
# mypy: allow-incomplete-defs, allow-untyped-calls, allow-untyped-defs
# mypy: no-check-untyped-defs, no-warn-return-any
# mypy: allow-untyped-calls, allow-untyped-defs, no-check-untyped-defs
# PyLint does not like the use of threaded_listener_factory
# pylint: disable=invalid-name
@ -187,7 +186,9 @@ track_same_state = threaded_listener_factory(async_track_same_state)
@callback
@bind_hass
def async_track_point_in_time(hass, action, point_in_time) -> CALLBACK_TYPE:
def async_track_point_in_time(
hass: HomeAssistant, action: Callable[..., None], point_in_time: datetime
) -> CALLBACK_TYPE:
"""Add a listener that fires once after a specific point in time."""
utc_point_in_time = dt_util.as_utc(point_in_time)
@ -204,7 +205,9 @@ track_point_in_time = threaded_listener_factory(async_track_point_in_time)
@callback
@bind_hass
def async_track_point_in_utc_time(hass, action, point_in_time) -> CALLBACK_TYPE:
def async_track_point_in_utc_time(
hass: HomeAssistant, action: Callable[..., None], point_in_time: datetime
) -> CALLBACK_TYPE:
"""Add a listener that fires once after a specific point in UTC time."""
# Ensure point_in_time is UTC
point_in_time = dt_util.as_utc(point_in_time)
@ -284,8 +287,8 @@ class SunListener:
action = attr.ib(type=Callable)
event = attr.ib(type=str)
offset = attr.ib(type=timedelta)
_unsub_sun = attr.ib(default=None)
_unsub_config = attr.ib(default=None)
_unsub_sun: CALLBACK_TYPE = attr.ib(default=None)
_unsub_config: CALLBACK_TYPE = attr.ib(default=None)
@callback
def async_attach(self):

View file

@ -322,9 +322,7 @@ class CircularDependency(LoaderError):
def _load_file(
hass, # type: HomeAssistant
comp_or_platform: str,
base_paths: List[str],
hass: "HomeAssistant", comp_or_platform: str, base_paths: List[str]
) -> Optional[ModuleType]:
"""Try to load specified file.
@ -391,11 +389,7 @@ def _load_file(
class ModuleWrapper:
"""Class to wrap a Python module and auto fill in hass argument."""
def __init__(
self,
hass, # type: HomeAssistant
module: ModuleType,
) -> None:
def __init__(self, hass: "HomeAssistant", module: ModuleType) -> None:
"""Initialize the module wrapper."""
self._hass = hass
self._module = module
@ -414,9 +408,7 @@ class ModuleWrapper:
class Components:
"""Helper to load components."""
def __init__(
self, hass # type: HomeAssistant
) -> None:
def __init__(self, hass: "HomeAssistant") -> None:
"""Initialize the Components class."""
self._hass = hass
@ -442,9 +434,7 @@ class Components:
class Helpers:
"""Helper to load helpers."""
def __init__(
self, hass # type: HomeAssistant
) -> None:
def __init__(self, hass: "HomeAssistant") -> None:
"""Initialize the Helpers class."""
self._hass = hass
@ -462,10 +452,7 @@ def bind_hass(func: CALLABLE_T) -> CALLABLE_T:
return func
async def async_component_dependencies(
hass, # type: HomeAssistant
domain: str,
) -> Set[str]:
async def async_component_dependencies(hass: "HomeAssistant", domain: str) -> Set[str]:
"""Return all dependencies and subdependencies of components.
Raises CircularDependency if a circular dependency is found.
@ -474,10 +461,7 @@ async def async_component_dependencies(
async def _async_component_dependencies(
hass, # type: HomeAssistant
domain: str,
loaded: Set[str],
loading: Set,
hass: "HomeAssistant", domain: str, loaded: Set[str], loading: Set
) -> Set[str]:
"""Recursive function to get component dependencies.
@ -508,9 +492,7 @@ async def _async_component_dependencies(
return loaded
def _async_mount_config_dir(
hass, # type: HomeAssistant
) -> bool:
def _async_mount_config_dir(hass: "HomeAssistant") -> bool:
"""Mount config dir in order to load custom_component.
Async friendly but not a coroutine.

View file

@ -93,7 +93,7 @@ class MockSwitcherV2Device:
@fixture(name="mock_bridge")
def mock_bridge_fixture() -> Generator[None, Any, None]:
"""Fixture for mocking aioswitcher.bridge.SwitcherV2Bridge."""
queue = Queue() # type: Queue
queue = Queue()
async def mock_queue():
"""Mock asyncio's Queue."""