From e55702d63528f52d5324cf24d233399b47607c2b Mon Sep 17 00:00:00 2001 From: Marc Mueller <30130371+cdce8p@users.noreply.github.com> Date: Wed, 17 Mar 2021 17:34:55 +0100 Subject: [PATCH] Update typing 01 (#48013) --- homeassistant/__main__.py | 5 +- homeassistant/bootstrap.py | 26 +++-- homeassistant/config.py | 54 ++++----- homeassistant/config_entries.py | 104 ++++++++--------- homeassistant/core.py | 185 +++++++++++++++---------------- homeassistant/data_entry_flow.py | 72 ++++++------ homeassistant/exceptions.py | 16 +-- homeassistant/loader.py | 116 +++++++++---------- homeassistant/requirements.py | 18 +-- homeassistant/runner.py | 10 +- homeassistant/setup.py | 10 +- 11 files changed, 303 insertions(+), 313 deletions(-) diff --git a/homeassistant/__main__.py b/homeassistant/__main__.py index 840e0bed24d..d8256e2ef92 100644 --- a/homeassistant/__main__.py +++ b/homeassistant/__main__.py @@ -1,11 +1,12 @@ """Start Home Assistant.""" +from __future__ import annotations + import argparse import os import platform import subprocess import sys import threading -from typing import List from homeassistant.const import REQUIRED_PYTHON_VER, RESTART_EXIT_CODE, __version__ @@ -206,7 +207,7 @@ def closefds_osx(min_fd: int, max_fd: int) -> None: pass -def cmdline() -> List[str]: +def cmdline() -> list[str]: """Collect path and arguments to re-execute the current hass instance.""" if os.path.basename(sys.argv[0]) == "__main__.py": modulepath = os.path.dirname(sys.argv[0]) diff --git a/homeassistant/bootstrap.py b/homeassistant/bootstrap.py index 6d334ac8953..8b191ae3a47 100644 --- a/homeassistant/bootstrap.py +++ b/homeassistant/bootstrap.py @@ -1,4 +1,6 @@ """Provide methods to bootstrap a Home Assistant instance.""" +from __future__ import annotations + import asyncio import contextlib from datetime import datetime @@ -8,7 +10,7 @@ import os import sys import threading from time import monotonic -from typing import TYPE_CHECKING, Any, Dict, Optional, Set +from typing import TYPE_CHECKING, Any import voluptuous as vol import yarl @@ -75,7 +77,7 @@ STAGE_1_INTEGRATIONS = { async def async_setup_hass( runtime_config: "RuntimeConfig", -) -> Optional[core.HomeAssistant]: +) -> core.HomeAssistant | None: """Set up Home Assistant.""" hass = core.HomeAssistant() hass.config.config_dir = runtime_config.config_dir @@ -188,7 +190,7 @@ def open_hass_ui(hass: core.HomeAssistant) -> None: async def async_from_config_dict( config: ConfigType, hass: core.HomeAssistant -) -> Optional[core.HomeAssistant]: +) -> core.HomeAssistant | None: """Try to configure Home Assistant from a configuration dictionary. Dynamically loads required components and its dependencies. @@ -255,8 +257,8 @@ async def async_from_config_dict( def async_enable_logging( hass: core.HomeAssistant, verbose: bool = False, - log_rotate_days: Optional[int] = None, - log_file: Optional[str] = None, + log_rotate_days: int | None = None, + log_file: str | None = None, log_no_color: bool = False, ) -> None: """Set up the logging. @@ -362,7 +364,7 @@ async def async_mount_local_lib_path(config_dir: str) -> str: @core.callback -def _get_domains(hass: core.HomeAssistant, config: Dict[str, Any]) -> Set[str]: +def _get_domains(hass: core.HomeAssistant, config: dict[str, Any]) -> set[str]: """Get domains of components to set up.""" # Filter out the repeating and common config section [homeassistant] domains = {key.split(" ")[0] for key in config if key != core.DOMAIN} @@ -379,7 +381,7 @@ def _get_domains(hass: core.HomeAssistant, config: Dict[str, Any]) -> Set[str]: async def _async_log_pending_setups( - hass: core.HomeAssistant, domains: Set[str], setup_started: Dict[str, datetime] + hass: core.HomeAssistant, domains: set[str], setup_started: dict[str, datetime] ) -> None: """Periodic log of setups that are pending for longer than LOG_SLOW_STARTUP_INTERVAL.""" while True: @@ -396,9 +398,9 @@ async def _async_log_pending_setups( async def async_setup_multi_components( hass: core.HomeAssistant, - domains: Set[str], - config: Dict[str, Any], - setup_started: Dict[str, datetime], + domains: set[str], + config: dict[str, Any], + setup_started: dict[str, datetime], ) -> None: """Set up multiple domains. Log on failure.""" futures = { @@ -422,7 +424,7 @@ async def async_setup_multi_components( async def _async_set_up_integrations( - hass: core.HomeAssistant, config: Dict[str, Any] + hass: core.HomeAssistant, config: dict[str, Any] ) -> None: """Set up all the integrations.""" setup_started = hass.data[DATA_SETUP_STARTED] = {} @@ -430,7 +432,7 @@ async def _async_set_up_integrations( # Resolve all dependencies so we know all integrations # that will have to be loaded and start rightaway - integration_cache: Dict[str, loader.Integration] = {} + integration_cache: dict[str, loader.Integration] = {} to_resolve = domains_to_setup while to_resolve: old_to_resolve = to_resolve diff --git a/homeassistant/config.py b/homeassistant/config.py index cfc1390a37b..362c93d04fa 100644 --- a/homeassistant/config.py +++ b/homeassistant/config.py @@ -1,4 +1,6 @@ """Module to help with parsing and generating configuration files.""" +from __future__ import annotations + from collections import OrderedDict import logging import os @@ -6,7 +8,7 @@ from pathlib import Path import re import shutil from types import ModuleType -from typing import Any, Callable, Dict, Optional, Sequence, Set, Tuple, Union +from typing import Any, Callable, Sequence from awesomeversion import AwesomeVersion import voluptuous as vol @@ -114,14 +116,14 @@ tts: def _no_duplicate_auth_provider( - configs: Sequence[Dict[str, Any]] -) -> Sequence[Dict[str, Any]]: + configs: Sequence[dict[str, Any]] +) -> Sequence[dict[str, Any]]: """No duplicate auth provider config allowed in a list. Each type of auth provider can only have one config without optional id. Unique id is required if same type of auth provider used multiple times. """ - config_keys: Set[Tuple[str, Optional[str]]] = set() + config_keys: set[tuple[str, str | None]] = set() for config in configs: key = (config[CONF_TYPE], config.get(CONF_ID)) if key in config_keys: @@ -135,8 +137,8 @@ def _no_duplicate_auth_provider( def _no_duplicate_auth_mfa_module( - configs: Sequence[Dict[str, Any]] -) -> Sequence[Dict[str, Any]]: + configs: Sequence[dict[str, Any]] +) -> Sequence[dict[str, Any]]: """No duplicate auth mfa module item allowed in a list. Each type of mfa module can only have one config without optional id. @@ -144,7 +146,7 @@ def _no_duplicate_auth_mfa_module( times. Note: this is different than auth provider """ - config_keys: Set[str] = set() + config_keys: set[str] = set() for config in configs: key = config.get(CONF_ID, config[CONF_TYPE]) if key in config_keys: @@ -313,7 +315,7 @@ def _write_default_config(config_dir: str) -> bool: return False -async def async_hass_config_yaml(hass: HomeAssistant) -> Dict: +async def async_hass_config_yaml(hass: HomeAssistant) -> dict: """Load YAML from a Home Assistant configuration file. This function allow a component inside the asyncio loop to reload its @@ -337,8 +339,8 @@ async def async_hass_config_yaml(hass: HomeAssistant) -> Dict: def load_yaml_config_file( - config_path: str, secrets: Optional[Secrets] = None -) -> Dict[Any, Any]: + config_path: str, secrets: Secrets | None = None +) -> dict[Any, Any]: """Parse a YAML configuration file. Raises FileNotFoundError or HomeAssistantError. @@ -421,9 +423,9 @@ def process_ha_config_upgrade(hass: HomeAssistant) -> None: def async_log_exception( ex: Exception, domain: str, - config: Dict, + config: dict, hass: HomeAssistant, - link: Optional[str] = None, + link: str | None = None, ) -> None: """Log an error for configuration validation. @@ -437,8 +439,8 @@ def async_log_exception( @callback def _format_config_error( - ex: Exception, domain: str, config: Dict, link: Optional[str] = None -) -> Tuple[str, bool]: + ex: Exception, domain: str, config: dict, link: str | None = None +) -> tuple[str, bool]: """Generate log exception for configuration validation. This method must be run in the event loop. @@ -474,7 +476,7 @@ def _format_config_error( return message, is_friendly -async def async_process_ha_core_config(hass: HomeAssistant, config: Dict) -> None: +async def async_process_ha_core_config(hass: HomeAssistant, config: dict) -> None: """Process the [homeassistant] section from the configuration. This method is a coroutine. @@ -603,7 +605,7 @@ async def async_process_ha_core_config(hass: HomeAssistant, config: Dict) -> Non ) -def _log_pkg_error(package: str, component: str, config: Dict, message: str) -> None: +def _log_pkg_error(package: str, component: str, config: dict, message: str) -> None: """Log an error while merging packages.""" message = f"Package {package} setup failed. Integration {component} {message}" @@ -616,7 +618,7 @@ def _log_pkg_error(package: str, component: str, config: Dict, message: str) -> _LOGGER.error(message) -def _identify_config_schema(module: ModuleType) -> Optional[str]: +def _identify_config_schema(module: ModuleType) -> str | None: """Extract the schema and identify list or dict based.""" if not isinstance(module.CONFIG_SCHEMA, vol.Schema): # type: ignore return None @@ -664,9 +666,9 @@ def _identify_config_schema(module: ModuleType) -> Optional[str]: return None -def _recursive_merge(conf: Dict[str, Any], package: Dict[str, Any]) -> Union[bool, str]: +def _recursive_merge(conf: dict[str, Any], package: dict[str, Any]) -> bool | str: """Merge package into conf, recursively.""" - error: Union[bool, str] = False + error: bool | str = False for key, pack_conf in package.items(): if isinstance(pack_conf, dict): if not pack_conf: @@ -688,10 +690,10 @@ def _recursive_merge(conf: Dict[str, Any], package: Dict[str, Any]) -> Union[boo async def merge_packages_config( hass: HomeAssistant, - config: Dict, - packages: Dict[str, Any], + config: dict, + packages: dict[str, Any], _log_pkg_error: Callable = _log_pkg_error, -) -> Dict: +) -> dict: """Merge packages into the top-level configuration. Mutate config.""" PACKAGES_CONFIG_SCHEMA(packages) for pack_name, pack_conf in packages.items(): @@ -754,7 +756,7 @@ async def merge_packages_config( async def async_process_component_config( hass: HomeAssistant, config: ConfigType, integration: Integration -) -> Optional[ConfigType]: +) -> ConfigType | None: """Check component configuration and return processed configuration. Returns None on error. @@ -879,13 +881,13 @@ async def async_process_component_config( @callback -def config_without_domain(config: Dict, domain: str) -> Dict: +def config_without_domain(config: dict, domain: str) -> dict: """Return a config with all configuration for a domain removed.""" filter_keys = extract_domain_configs(config, domain) return {key: value for key, value in config.items() if key not in filter_keys} -async def async_check_ha_config_file(hass: HomeAssistant) -> Optional[str]: +async def async_check_ha_config_file(hass: HomeAssistant) -> str | None: """Check if Home Assistant configuration file is valid. This method is a coroutine. @@ -902,7 +904,7 @@ async def async_check_ha_config_file(hass: HomeAssistant) -> Optional[str]: @callback def async_notify_setup_error( - hass: HomeAssistant, component: str, display_link: Optional[str] = None + hass: HomeAssistant, component: str, display_link: str | None = None ) -> None: """Print a persistent notification. diff --git a/homeassistant/config_entries.py b/homeassistant/config_entries.py index c105be42ba3..a18e207a730 100644 --- a/homeassistant/config_entries.py +++ b/homeassistant/config_entries.py @@ -5,7 +5,7 @@ import asyncio import functools import logging from types import MappingProxyType, MethodType -from typing import Any, Callable, Dict, List, Optional, Set, Union, cast +from typing import Any, Callable, Optional, cast import weakref import attr @@ -143,11 +143,11 @@ class ConfigEntry: source: str, connection_class: str, system_options: dict, - options: Optional[dict] = None, - unique_id: Optional[str] = None, - entry_id: Optional[str] = None, + options: dict | None = None, + unique_id: str | None = None, + entry_id: str | None = None, state: str = ENTRY_STATE_NOT_LOADED, - disabled_by: Optional[str] = None, + disabled_by: str | None = None, ) -> None: """Initialize a config entry.""" # Unique id of the config entry @@ -190,18 +190,18 @@ class ConfigEntry: self.supports_unload = False # Listeners to call on update - self.update_listeners: List[ - Union[weakref.ReferenceType[UpdateListenerType], weakref.WeakMethod] + self.update_listeners: list[ + weakref.ReferenceType[UpdateListenerType] | weakref.WeakMethod ] = [] # Function to cancel a scheduled retry - self._async_cancel_retry_setup: Optional[Callable[[], Any]] = None + self._async_cancel_retry_setup: Callable[[], Any] | None = None async def async_setup( self, hass: HomeAssistant, *, - integration: Optional[loader.Integration] = None, + integration: loader.Integration | None = None, tries: int = 0, ) -> None: """Set up an entry.""" @@ -295,7 +295,7 @@ class ConfigEntry: self.state = ENTRY_STATE_SETUP_ERROR async def async_unload( - self, hass: HomeAssistant, *, integration: Optional[loader.Integration] = None + self, hass: HomeAssistant, *, integration: loader.Integration | None = None ) -> bool: """Unload an entry. @@ -442,7 +442,7 @@ class ConfigEntry: return lambda: self.update_listeners.remove(weak_listener) - def as_dict(self) -> Dict[str, Any]: + def as_dict(self) -> dict[str, Any]: """Return dictionary version of this entry.""" return { "entry_id": self.entry_id, @@ -471,8 +471,8 @@ class ConfigEntriesFlowManager(data_entry_flow.FlowManager): self._hass_config = hass_config async def async_finish_flow( - self, flow: data_entry_flow.FlowHandler, result: Dict[str, Any] - ) -> Dict[str, Any]: + self, flow: data_entry_flow.FlowHandler, result: dict[str, Any] + ) -> dict[str, Any]: """Finish a config flow and add an entry.""" flow = cast(ConfigFlow, flow) @@ -542,7 +542,7 @@ class ConfigEntriesFlowManager(data_entry_flow.FlowManager): return result async def async_create_flow( - self, handler_key: Any, *, context: Optional[Dict] = None, data: Any = None + self, handler_key: Any, *, context: dict | None = None, data: Any = None ) -> ConfigFlow: """Create a flow for specified handler. @@ -619,14 +619,14 @@ class ConfigEntries: self.flow = ConfigEntriesFlowManager(hass, self, hass_config) self.options = OptionsFlowManager(hass) self._hass_config = hass_config - self._entries: List[ConfigEntry] = [] + self._entries: list[ConfigEntry] = [] self._store = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY) EntityRegistryDisabledHandler(hass).async_setup() @callback - def async_domains(self) -> List[str]: + def async_domains(self) -> list[str]: """Return domains for which we have entries.""" - seen: Set[str] = set() + seen: set[str] = set() result = [] for entry in self._entries: @@ -637,7 +637,7 @@ class ConfigEntries: return result @callback - def async_get_entry(self, entry_id: str) -> Optional[ConfigEntry]: + def async_get_entry(self, entry_id: str) -> ConfigEntry | None: """Return entry with matching entry_id.""" for entry in self._entries: if entry_id == entry.entry_id: @@ -645,7 +645,7 @@ class ConfigEntries: return None @callback - def async_entries(self, domain: Optional[str] = None) -> List[ConfigEntry]: + def async_entries(self, domain: str | None = None) -> list[ConfigEntry]: """Return all entries or entries for a specific domain.""" if domain is None: return list(self._entries) @@ -657,7 +657,7 @@ class ConfigEntries: await self.async_setup(entry.entry_id) self._async_schedule_save() - async def async_remove(self, entry_id: str) -> Dict[str, Any]: + async def async_remove(self, entry_id: str) -> dict[str, Any]: """Remove an entry.""" entry = self.async_get_entry(entry_id) @@ -789,7 +789,7 @@ class ConfigEntries: return await self.async_setup(entry_id) async def async_set_disabled_by( - self, entry_id: str, disabled_by: Optional[str] + self, entry_id: str, disabled_by: str | None ) -> bool: """Disable an entry. @@ -829,11 +829,11 @@ class ConfigEntries: self, entry: ConfigEntry, *, - unique_id: Union[str, dict, None, UndefinedType] = UNDEFINED, - title: Union[str, dict, UndefinedType] = UNDEFINED, - data: Union[dict, UndefinedType] = UNDEFINED, - options: Union[dict, UndefinedType] = UNDEFINED, - system_options: Union[dict, UndefinedType] = UNDEFINED, + unique_id: str | dict | None | UndefinedType = UNDEFINED, + title: str | dict | UndefinedType = UNDEFINED, + data: dict | UndefinedType = UNDEFINED, + options: dict | UndefinedType = UNDEFINED, + system_options: dict | UndefinedType = UNDEFINED, ) -> bool: """Update a config entry. @@ -918,12 +918,12 @@ class ConfigEntries: self._store.async_delay_save(self._data_to_save, SAVE_DELAY) @callback - def _data_to_save(self) -> Dict[str, List[Dict[str, Any]]]: + def _data_to_save(self) -> dict[str, list[dict[str, Any]]]: """Return data to save.""" return {"entries": [entry.as_dict() for entry in self._entries]} -async def _old_conf_migrator(old_config: Dict[str, Any]) -> Dict[str, Any]: +async def _old_conf_migrator(old_config: dict[str, Any]) -> dict[str, Any]: """Migrate the pre-0.73 config format to the latest version.""" return {"entries": old_config} @@ -931,7 +931,7 @@ async def _old_conf_migrator(old_config: Dict[str, Any]) -> Dict[str, Any]: class ConfigFlow(data_entry_flow.FlowHandler): """Base class for config flows with some helpers.""" - def __init_subclass__(cls, domain: Optional[str] = None, **kwargs: Any) -> None: + def __init_subclass__(cls, domain: str | None = None, **kwargs: Any) -> None: """Initialize a subclass, register if possible.""" super().__init_subclass__(**kwargs) # type: ignore if domain is not None: @@ -940,7 +940,7 @@ class ConfigFlow(data_entry_flow.FlowHandler): CONNECTION_CLASS = CONN_CLASS_UNKNOWN @property - def unique_id(self) -> Optional[str]: + def unique_id(self) -> str | None: """Return unique ID if available.""" if not self.context: return None @@ -956,7 +956,7 @@ class ConfigFlow(data_entry_flow.FlowHandler): @callback def _abort_if_unique_id_configured( self, - updates: Optional[Dict[Any, Any]] = None, + updates: dict[Any, Any] | None = None, reload_on_update: bool = True, ) -> None: """Abort if the unique ID is already configured.""" @@ -983,8 +983,8 @@ class ConfigFlow(data_entry_flow.FlowHandler): raise data_entry_flow.AbortFlow("already_configured") async def async_set_unique_id( - self, unique_id: Optional[str] = None, *, raise_on_progress: bool = True - ) -> Optional[ConfigEntry]: + self, unique_id: str | None = None, *, raise_on_progress: bool = True + ) -> ConfigEntry | None: """Set a unique ID for the config flow. Returns optionally existing config entry with same ID. @@ -1020,7 +1020,7 @@ class ConfigFlow(data_entry_flow.FlowHandler): self.context["confirm_only"] = True @callback - def _async_current_entries(self, include_ignore: bool = False) -> List[ConfigEntry]: + def _async_current_entries(self, include_ignore: bool = False) -> list[ConfigEntry]: """Return current entries. If the flow is user initiated, filter out ignored entries unless include_ignore is True. @@ -1033,7 +1033,7 @@ class ConfigFlow(data_entry_flow.FlowHandler): return [entry for entry in config_entries if entry.source != SOURCE_IGNORE] @callback - def _async_current_ids(self, include_ignore: bool = True) -> Set[Optional[str]]: + def _async_current_ids(self, include_ignore: bool = True) -> set[str | None]: """Return current unique IDs.""" return { entry.unique_id @@ -1042,7 +1042,7 @@ class ConfigFlow(data_entry_flow.FlowHandler): } @callback - def _async_in_progress(self) -> List[Dict]: + def _async_in_progress(self) -> list[dict]: """Return other in progress flows for current domain.""" return [ flw @@ -1050,18 +1050,18 @@ class ConfigFlow(data_entry_flow.FlowHandler): if flw["handler"] == self.handler and flw["flow_id"] != self.flow_id ] - async def async_step_ignore(self, user_input: Dict[str, Any]) -> Dict[str, Any]: + async def async_step_ignore(self, user_input: dict[str, Any]) -> dict[str, Any]: """Ignore this config flow.""" await self.async_set_unique_id(user_input["unique_id"], raise_on_progress=False) return self.async_create_entry(title=user_input["title"], data={}) - async def async_step_unignore(self, user_input: Dict[str, Any]) -> Dict[str, Any]: + async def async_step_unignore(self, user_input: dict[str, Any]) -> dict[str, Any]: """Rediscover a config entry by it's unique_id.""" return self.async_abort(reason="not_implemented") async def async_step_user( - self, user_input: Optional[Dict[str, Any]] = None - ) -> Dict[str, Any]: + self, user_input: dict[str, Any] | None = None + ) -> dict[str, Any]: """Handle a flow initiated by the user.""" return self.async_abort(reason="not_implemented") @@ -1090,16 +1090,16 @@ class ConfigFlow(data_entry_flow.FlowHandler): raise data_entry_flow.AbortFlow("already_in_progress") async def async_step_discovery( - self, discovery_info: Dict[str, Any] - ) -> Dict[str, Any]: + self, discovery_info: dict[str, Any] + ) -> dict[str, Any]: """Handle a flow initialized by discovery.""" await self._async_handle_discovery_without_unique_id() return await self.async_step_user() @callback def async_abort( - self, *, reason: str, description_placeholders: Optional[Dict] = None - ) -> Dict[str, Any]: + self, *, reason: str, description_placeholders: dict | None = None + ) -> dict[str, Any]: """Abort the config flow.""" # Remove reauth notification if no reauth flows are in progress if self.source == SOURCE_REAUTH and not any( @@ -1130,8 +1130,8 @@ class OptionsFlowManager(data_entry_flow.FlowManager): self, handler_key: Any, *, - context: Optional[Dict[str, Any]] = None, - data: Optional[Dict[str, Any]] = None, + context: dict[str, Any] | None = None, + data: dict[str, Any] | None = None, ) -> OptionsFlow: """Create an options flow for a config entry. @@ -1147,8 +1147,8 @@ class OptionsFlowManager(data_entry_flow.FlowManager): return cast(OptionsFlow, HANDLERS[entry.domain].async_get_options_flow(entry)) async def async_finish_flow( - self, flow: data_entry_flow.FlowHandler, result: Dict[str, Any] - ) -> Dict[str, Any]: + self, flow: data_entry_flow.FlowHandler, result: dict[str, Any] + ) -> dict[str, Any]: """Finish an options flow and update options for configuration entry. Flow.handler and entry_id is the same thing to map flow with entry. @@ -1184,7 +1184,7 @@ class SystemOptions: """Update properties.""" self.disable_new_entities = disable_new_entities - def as_dict(self) -> Dict[str, Any]: + def as_dict(self) -> dict[str, Any]: """Return dictionary version of this config entries system options.""" return {"disable_new_entities": self.disable_new_entities} @@ -1195,9 +1195,9 @@ class EntityRegistryDisabledHandler: def __init__(self, hass: HomeAssistant) -> None: """Initialize the handler.""" self.hass = hass - self.registry: Optional[entity_registry.EntityRegistry] = None - self.changed: Set[str] = set() - self._remove_call_later: Optional[Callable[[], None]] = None + self.registry: entity_registry.EntityRegistry | None = None + self.changed: set[str] = set() + self._remove_call_later: Callable[[], None] | None = None @callback def async_setup(self) -> None: diff --git a/homeassistant/core.py b/homeassistant/core.py index b7bb645be36..4e0c3c48283 100644 --- a/homeassistant/core.py +++ b/homeassistant/core.py @@ -4,6 +4,8 @@ Core components of Home Assistant. Home Assistant is a Home Automation framework for observing the state of entities and react to changes. """ +from __future__ import annotations + import asyncio import datetime import enum @@ -22,15 +24,10 @@ from typing import ( Callable, Collection, Coroutine, - Dict, Iterable, - List, Mapping, Optional, - Set, - Tuple, TypeVar, - Union, cast, ) @@ -119,7 +116,7 @@ TIMEOUT_EVENT_START = 15 _LOGGER = logging.getLogger(__name__) -def split_entity_id(entity_id: str) -> List[str]: +def split_entity_id(entity_id: str) -> list[str]: """Split a state entity ID into domain and object ID.""" return entity_id.split(".", 1) @@ -237,7 +234,7 @@ class HomeAssistant: self.state: CoreState = CoreState.not_running self.exit_code: int = 0 # If not None, use to signal end-of-loop - self._stopped: Optional[asyncio.Event] = None + self._stopped: asyncio.Event | None = None # Timeout handler for Core/Helper namespace self.timeout: TimeoutManager = TimeoutManager() @@ -342,7 +339,7 @@ class HomeAssistant: @callback def async_add_job( self, target: Callable[..., Any], *args: Any - ) -> Optional[asyncio.Future]: + ) -> asyncio.Future | None: """Add a job from within the event loop. This method must be run in the event loop. @@ -359,9 +356,7 @@ class HomeAssistant: return self.async_add_hass_job(HassJob(target), *args) @callback - def async_add_hass_job( - self, hassjob: HassJob, *args: Any - ) -> Optional[asyncio.Future]: + def async_add_hass_job(self, hassjob: HassJob, *args: Any) -> asyncio.Future | None: """Add a HassJob from within the event loop. This method must be run in the event loop. @@ -423,9 +418,7 @@ class HomeAssistant: self._track_task = False @callback - def async_run_hass_job( - self, hassjob: HassJob, *args: Any - ) -> Optional[asyncio.Future]: + def async_run_hass_job(self, hassjob: HassJob, *args: Any) -> asyncio.Future | None: """Run a HassJob from within the event loop. This method must be run in the event loop. @@ -441,8 +434,8 @@ class HomeAssistant: @callback def async_run_job( - self, target: Callable[..., Union[None, Awaitable]], *args: Any - ) -> Optional[asyncio.Future]: + self, target: Callable[..., None | Awaitable], *args: Any + ) -> asyncio.Future | None: """Run a job from within the event loop. This method must be run in the event loop. @@ -465,7 +458,7 @@ class HomeAssistant: """Block until all pending work is done.""" # To flush out any call_soon_threadsafe await asyncio.sleep(0) - start_time: Optional[float] = None + start_time: float | None = None while self._pending_tasks: pending = [task for task in self._pending_tasks if not task.done()] @@ -582,10 +575,10 @@ class Context: """The context that triggered something.""" user_id: str = attr.ib(default=None) - parent_id: Optional[str] = attr.ib(default=None) + parent_id: str | None = attr.ib(default=None) id: str = attr.ib(factory=uuid_util.random_uuid_hex) - def as_dict(self) -> Dict[str, Optional[str]]: + def as_dict(self) -> dict[str, str | None]: """Return a dictionary representation of the context.""" return {"id": self.id, "parent_id": self.parent_id, "user_id": self.user_id} @@ -610,10 +603,10 @@ class Event: def __init__( self, event_type: str, - data: Optional[Dict[str, Any]] = None, + data: dict[str, Any] | None = None, origin: EventOrigin = EventOrigin.local, - time_fired: Optional[datetime.datetime] = None, - context: Optional[Context] = None, + time_fired: datetime.datetime | None = None, + context: Context | None = None, ) -> None: """Initialize a new event.""" self.event_type = event_type @@ -627,7 +620,7 @@ class Event: # The only event type that shares context are the TIME_CHANGED return hash((self.event_type, self.context.id, self.time_fired)) - def as_dict(self) -> Dict[str, Any]: + def as_dict(self) -> dict[str, Any]: """Create a dict representation of this Event. Async friendly. @@ -664,11 +657,11 @@ class EventBus: def __init__(self, hass: HomeAssistant) -> None: """Initialize a new event bus.""" - self._listeners: Dict[str, List[Tuple[HassJob, Optional[Callable]]]] = {} + self._listeners: dict[str, list[tuple[HassJob, Callable | None]]] = {} self._hass = hass @callback - def async_listeners(self) -> Dict[str, int]: + def async_listeners(self) -> dict[str, int]: """Return dictionary with events and the number of listeners. This method must be run in the event loop. @@ -676,16 +669,16 @@ class EventBus: return {key: len(self._listeners[key]) for key in self._listeners} @property - def listeners(self) -> Dict[str, int]: + def listeners(self) -> dict[str, int]: """Return dictionary with events and the number of listeners.""" return run_callback_threadsafe(self._hass.loop, self.async_listeners).result() def fire( self, event_type: str, - event_data: Optional[Dict] = None, + event_data: dict | None = None, origin: EventOrigin = EventOrigin.local, - context: Optional[Context] = None, + context: Context | None = None, ) -> None: """Fire an event.""" self._hass.loop.call_soon_threadsafe( @@ -696,10 +689,10 @@ class EventBus: def async_fire( self, event_type: str, - event_data: Optional[Dict[str, Any]] = None, + event_data: dict[str, Any] | None = None, origin: EventOrigin = EventOrigin.local, - context: Optional[Context] = None, - time_fired: Optional[datetime.datetime] = None, + context: Context | None = None, + time_fired: datetime.datetime | None = None, ) -> None: """Fire an event. @@ -751,7 +744,7 @@ class EventBus: self, event_type: str, listener: Callable, - event_filter: Optional[Callable] = None, + event_filter: Callable | None = None, ) -> CALLBACK_TYPE: """Listen for all events or events of a specific type. @@ -772,7 +765,7 @@ class EventBus: @callback def _async_listen_filterable_job( - self, event_type: str, filterable_job: Tuple[HassJob, Optional[Callable]] + self, event_type: str, filterable_job: tuple[HassJob, Callable | None] ) -> CALLBACK_TYPE: self._listeners.setdefault(event_type, []).append(filterable_job) @@ -811,7 +804,7 @@ class EventBus: This method must be run in the event loop. """ - filterable_job: Optional[Tuple[HassJob, Optional[Callable]]] = None + filterable_job: tuple[HassJob, Callable | None] | None = None @callback def _onetime_listener(event: Event) -> None: @@ -835,7 +828,7 @@ class EventBus: @callback def _async_remove_listener( - self, event_type: str, filterable_job: Tuple[HassJob, Optional[Callable]] + self, event_type: str, filterable_job: tuple[HassJob, Callable | None] ) -> None: """Remove a listener of a specific event_type. @@ -884,11 +877,11 @@ class State: self, entity_id: str, state: str, - attributes: Optional[Mapping[str, Any]] = None, - last_changed: Optional[datetime.datetime] = None, - last_updated: Optional[datetime.datetime] = None, - context: Optional[Context] = None, - validate_entity_id: Optional[bool] = True, + attributes: Mapping[str, Any] | None = None, + last_changed: datetime.datetime | None = None, + last_updated: datetime.datetime | None = None, + context: Context | None = None, + validate_entity_id: bool | None = True, ) -> None: """Initialize a new state.""" state = str(state) @@ -912,7 +905,7 @@ class State: self.last_changed = last_changed or self.last_updated self.context = context or Context() self.domain, self.object_id = split_entity_id(self.entity_id) - self._as_dict: Optional[Dict[str, Collection[Any]]] = None + self._as_dict: dict[str, Collection[Any]] | None = None @property def name(self) -> str: @@ -921,7 +914,7 @@ class State: "_", " " ) - def as_dict(self) -> Dict: + def as_dict(self) -> dict: """Return a dict representation of the State. Async friendly. @@ -946,7 +939,7 @@ class State: return self._as_dict @classmethod - def from_dict(cls, json_dict: Dict) -> Any: + def from_dict(cls, json_dict: dict) -> Any: """Initialize a state from a dict. Async friendly. @@ -1004,12 +997,12 @@ class StateMachine: def __init__(self, bus: EventBus, loop: asyncio.events.AbstractEventLoop) -> None: """Initialize state machine.""" - self._states: Dict[str, State] = {} - self._reservations: Set[str] = set() + self._states: dict[str, State] = {} + self._reservations: set[str] = set() self._bus = bus self._loop = loop - def entity_ids(self, domain_filter: Optional[str] = None) -> List[str]: + def entity_ids(self, domain_filter: str | None = None) -> list[str]: """List of entity ids that are being tracked.""" future = run_callback_threadsafe( self._loop, self.async_entity_ids, domain_filter @@ -1018,8 +1011,8 @@ class StateMachine: @callback def async_entity_ids( - self, domain_filter: Optional[Union[str, Iterable]] = None - ) -> List[str]: + self, domain_filter: str | Iterable | None = None + ) -> list[str]: """List of entity ids that are being tracked. This method must be run in the event loop. @@ -1038,7 +1031,7 @@ class StateMachine: @callback def async_entity_ids_count( - self, domain_filter: Optional[Union[str, Iterable]] = None + self, domain_filter: str | Iterable | None = None ) -> int: """Count the entity ids that are being tracked. @@ -1054,16 +1047,14 @@ class StateMachine: [None for state in self._states.values() if state.domain in domain_filter] ) - def all(self, domain_filter: Optional[Union[str, Iterable]] = None) -> List[State]: + def all(self, domain_filter: str | Iterable | None = None) -> list[State]: """Create a list of all states.""" return run_callback_threadsafe( self._loop, self.async_all, domain_filter ).result() @callback - def async_all( - self, domain_filter: Optional[Union[str, Iterable]] = None - ) -> List[State]: + def async_all(self, domain_filter: str | Iterable | None = None) -> list[State]: """Create a list of all states matching the filter. This method must be run in the event loop. @@ -1078,7 +1069,7 @@ class StateMachine: state for state in self._states.values() if state.domain in domain_filter ] - def get(self, entity_id: str) -> Optional[State]: + def get(self, entity_id: str) -> State | None: """Retrieve state of entity_id or None if not found. Async friendly. @@ -1103,7 +1094,7 @@ class StateMachine: ).result() @callback - def async_remove(self, entity_id: str, context: Optional[Context] = None) -> bool: + def async_remove(self, entity_id: str, context: Context | None = None) -> bool: """Remove the state of an entity. Returns boolean to indicate if an entity was removed. @@ -1131,9 +1122,9 @@ class StateMachine: self, entity_id: str, new_state: str, - attributes: Optional[Mapping[str, Any]] = None, + attributes: Mapping[str, Any] | None = None, force_update: bool = False, - context: Optional[Context] = None, + context: Context | None = None, ) -> None: """Set the state of an entity, add entity if it does not exist. @@ -1180,9 +1171,9 @@ class StateMachine: self, entity_id: str, new_state: str, - attributes: Optional[Mapping[str, Any]] = None, + attributes: Mapping[str, Any] | None = None, force_update: bool = False, - context: Optional[Context] = None, + context: Context | None = None, ) -> None: """Set the state of an entity, add entity if it does not exist. @@ -1241,8 +1232,8 @@ class Service: def __init__( self, func: Callable, - schema: Optional[vol.Schema], - context: Optional[Context] = None, + schema: vol.Schema | None, + context: Context | None = None, ) -> None: """Initialize a service.""" self.job = HassJob(func) @@ -1258,8 +1249,8 @@ class ServiceCall: self, domain: str, service: str, - data: Optional[Dict] = None, - context: Optional[Context] = None, + data: dict | None = None, + context: Context | None = None, ) -> None: """Initialize a service call.""" self.domain = domain.lower() @@ -1283,16 +1274,16 @@ class ServiceRegistry: def __init__(self, hass: HomeAssistant) -> None: """Initialize a service registry.""" - self._services: Dict[str, Dict[str, Service]] = {} + self._services: dict[str, dict[str, Service]] = {} self._hass = hass @property - def services(self) -> Dict[str, Dict[str, Service]]: + def services(self) -> dict[str, dict[str, Service]]: """Return dictionary with per domain a list of available services.""" return run_callback_threadsafe(self._hass.loop, self.async_services).result() @callback - def async_services(self) -> Dict[str, Dict[str, Service]]: + def async_services(self) -> dict[str, dict[str, Service]]: """Return dictionary with per domain a list of available services. This method must be run in the event loop. @@ -1311,7 +1302,7 @@ class ServiceRegistry: domain: str, service: str, service_func: Callable, - schema: Optional[vol.Schema] = None, + schema: vol.Schema | None = None, ) -> None: """ Register a service. @@ -1328,7 +1319,7 @@ class ServiceRegistry: domain: str, service: str, service_func: Callable, - schema: Optional[vol.Schema] = None, + schema: vol.Schema | None = None, ) -> None: """ Register a service. @@ -1382,12 +1373,12 @@ class ServiceRegistry: self, domain: str, service: str, - service_data: Optional[Dict] = None, + service_data: dict | None = None, blocking: bool = False, - context: Optional[Context] = None, - limit: Optional[float] = SERVICE_CALL_LIMIT, - target: Optional[Dict] = None, - ) -> Optional[bool]: + context: Context | None = None, + limit: float | None = SERVICE_CALL_LIMIT, + target: dict | None = None, + ) -> bool | None: """ Call a service. @@ -1404,12 +1395,12 @@ class ServiceRegistry: self, domain: str, service: str, - service_data: Optional[Dict] = None, + service_data: dict | None = None, blocking: bool = False, - context: Optional[Context] = None, - limit: Optional[float] = SERVICE_CALL_LIMIT, - target: Optional[Dict] = None, - ) -> Optional[bool]: + context: Context | None = None, + limit: float | None = SERVICE_CALL_LIMIT, + target: dict | None = None, + ) -> bool | None: """ Call a service. @@ -1497,7 +1488,7 @@ class ServiceRegistry: return False def _run_service_in_background( - self, coro_or_task: Union[Coroutine, asyncio.Task], service_call: ServiceCall + self, coro_or_task: Coroutine | asyncio.Task, service_call: ServiceCall ) -> None: """Run service call in background, catching and logging any exceptions.""" @@ -1542,8 +1533,8 @@ class Config: self.location_name: str = "Home" self.time_zone: datetime.tzinfo = dt_util.UTC self.units: UnitSystem = METRIC_SYSTEM - self.internal_url: Optional[str] = None - self.external_url: Optional[str] = None + self.internal_url: str | None = None + self.external_url: str | None = None self.config_source: str = "default" @@ -1551,22 +1542,22 @@ class Config: self.skip_pip: bool = False # List of loaded components - self.components: Set[str] = set() + self.components: set[str] = set() # API (HTTP) server configuration, see components.http.ApiConfig - self.api: Optional[Any] = None + self.api: Any | None = None # Directory that holds the configuration - self.config_dir: Optional[str] = None + self.config_dir: str | None = None # List of allowed external dirs to access - self.allowlist_external_dirs: Set[str] = set() + self.allowlist_external_dirs: set[str] = set() # List of allowed external URLs that integrations may use - self.allowlist_external_urls: Set[str] = set() + self.allowlist_external_urls: set[str] = set() # Dictionary of Media folders that integrations may use - self.media_dirs: Dict[str, str] = {} + self.media_dirs: dict[str, str] = {} # If Home Assistant is running in safe mode self.safe_mode: bool = False @@ -1574,7 +1565,7 @@ class Config: # Use legacy template behavior self.legacy_templates: bool = False - def distance(self, lat: float, lon: float) -> Optional[float]: + def distance(self, lat: float, lon: float) -> float | None: """Calculate distance from Home Assistant. Async friendly. @@ -1625,7 +1616,7 @@ class Config: return False - def as_dict(self) -> Dict: + def as_dict(self) -> dict: """Create a dictionary representation of the configuration. Async friendly. @@ -1670,15 +1661,15 @@ class Config: self, *, source: str, - latitude: Optional[float] = None, - longitude: Optional[float] = None, - elevation: Optional[int] = None, - unit_system: Optional[str] = None, - location_name: Optional[str] = None, - time_zone: Optional[str] = None, + latitude: float | None = None, + longitude: float | None = None, + elevation: int | None = None, + unit_system: str | None = None, + location_name: str | None = None, + time_zone: str | None = None, # pylint: disable=dangerous-default-value # _UNDEFs not modified - external_url: Optional[Union[str, dict]] = _UNDEF, - internal_url: Optional[Union[str, dict]] = _UNDEF, + external_url: str | dict | None = _UNDEF, + internal_url: str | dict | None = _UNDEF, ) -> None: """Update the configuration from a dictionary.""" self.config_source = source diff --git a/homeassistant/data_entry_flow.py b/homeassistant/data_entry_flow.py index 15ad417c6ad..f16c1859fcd 100644 --- a/homeassistant/data_entry_flow.py +++ b/homeassistant/data_entry_flow.py @@ -4,7 +4,7 @@ from __future__ import annotations import abc import asyncio from types import MappingProxyType -from typing import Any, Dict, List, Optional +from typing import Any import uuid import voluptuous as vol @@ -43,7 +43,7 @@ class UnknownStep(FlowError): class AbortFlow(FlowError): """Exception to indicate a flow needs to be aborted.""" - def __init__(self, reason: str, description_placeholders: Optional[Dict] = None): + def __init__(self, reason: str, description_placeholders: dict | None = None): """Initialize an abort flow exception.""" super().__init__(f"Flow aborted: {reason}") self.reason = reason @@ -59,8 +59,8 @@ class FlowManager(abc.ABC): ) -> None: """Initialize the flow manager.""" self.hass = hass - self._initializing: Dict[str, List[asyncio.Future]] = {} - self._progress: Dict[str, Any] = {} + self._initializing: dict[str, list[asyncio.Future]] = {} + self._progress: dict[str, Any] = {} async def async_wait_init_flow_finish(self, handler: str) -> None: """Wait till all flows in progress are initialized.""" @@ -76,8 +76,8 @@ class FlowManager(abc.ABC): self, handler_key: Any, *, - context: Optional[Dict[str, Any]] = None, - data: Optional[Dict[str, Any]] = None, + context: dict[str, Any] | None = None, + data: dict[str, Any] | None = None, ) -> FlowHandler: """Create a flow for specified handler. @@ -86,17 +86,17 @@ class FlowManager(abc.ABC): @abc.abstractmethod async def async_finish_flow( - self, flow: "FlowHandler", result: Dict[str, Any] - ) -> Dict[str, Any]: + self, flow: "FlowHandler", result: dict[str, Any] + ) -> dict[str, Any]: """Finish a config flow and add an entry.""" async def async_post_init( - self, flow: "FlowHandler", result: Dict[str, Any] + self, flow: "FlowHandler", result: dict[str, Any] ) -> None: """Entry has finished executing its first step asynchronously.""" @callback - def async_progress(self) -> List[Dict]: + def async_progress(self) -> list[dict]: """Return the flows in progress.""" return [ { @@ -110,7 +110,7 @@ class FlowManager(abc.ABC): ] async def async_init( - self, handler: str, *, context: Optional[Dict] = None, data: Any = None + self, handler: str, *, context: dict | None = None, data: Any = None ) -> Any: """Start a configuration flow.""" if context is None: @@ -142,7 +142,7 @@ class FlowManager(abc.ABC): return result async def async_configure( - self, flow_id: str, user_input: Optional[Dict] = None + self, flow_id: str, user_input: dict | None = None ) -> Any: """Continue a configuration flow.""" flow = self._progress.get(flow_id) @@ -198,9 +198,9 @@ class FlowManager(abc.ABC): self, flow: Any, step_id: str, - user_input: Optional[Dict], - step_done: Optional[asyncio.Future] = None, - ) -> Dict: + user_input: dict | None, + step_done: asyncio.Future | None = None, + ) -> dict: """Handle a step of a flow.""" method = f"async_step_{step_id}" @@ -213,7 +213,7 @@ class FlowManager(abc.ABC): ) try: - result: Dict = await getattr(flow, method)(user_input) + result: dict = await getattr(flow, method)(user_input) except AbortFlow as err: result = _create_abort_data( flow.flow_id, flow.handler, err.reason, err.description_placeholders @@ -265,13 +265,13 @@ class FlowHandler: """Handle the configuration flow of a component.""" # Set by flow manager - cur_step: Optional[Dict[str, str]] = None + cur_step: dict[str, str] | None = None # Ignore types: https://github.com/PyCQA/pylint/issues/3167 flow_id: str = None # type: ignore hass: HomeAssistant = None # type: ignore handler: str = None # type: ignore # Ensure the attribute has a subscriptable, but immutable, default value. - context: Dict = MappingProxyType({}) # type: ignore + context: dict = MappingProxyType({}) # type: ignore # Set by _async_create_flow callback init_step = "init" @@ -280,7 +280,7 @@ class FlowHandler: VERSION = 1 @property - def source(self) -> Optional[str]: + def source(self) -> str | None: """Source that initialized the flow.""" if not hasattr(self, "context"): return None @@ -301,9 +301,9 @@ class FlowHandler: *, step_id: str, data_schema: vol.Schema = None, - errors: Optional[Dict] = None, - description_placeholders: Optional[Dict] = None, - ) -> Dict[str, Any]: + errors: dict | None = None, + description_placeholders: dict | None = None, + ) -> dict[str, Any]: """Return the definition of a form to gather user input.""" return { "type": RESULT_TYPE_FORM, @@ -320,10 +320,10 @@ class FlowHandler: self, *, title: str, - data: Dict, - description: Optional[str] = None, - description_placeholders: Optional[Dict] = None, - ) -> Dict[str, Any]: + data: dict, + description: str | None = None, + description_placeholders: dict | None = None, + ) -> dict[str, Any]: """Finish config flow and create a config entry.""" return { "version": self.VERSION, @@ -338,8 +338,8 @@ class FlowHandler: @callback def async_abort( - self, *, reason: str, description_placeholders: Optional[Dict] = None - ) -> Dict[str, Any]: + self, *, reason: str, description_placeholders: dict | None = None + ) -> dict[str, Any]: """Abort the config flow.""" return _create_abort_data( self.flow_id, self.handler, reason, description_placeholders @@ -347,8 +347,8 @@ class FlowHandler: @callback def async_external_step( - self, *, step_id: str, url: str, description_placeholders: Optional[Dict] = None - ) -> Dict[str, Any]: + self, *, step_id: str, url: str, description_placeholders: dict | None = None + ) -> dict[str, Any]: """Return the definition of an external step for the user to take.""" return { "type": RESULT_TYPE_EXTERNAL_STEP, @@ -360,7 +360,7 @@ class FlowHandler: } @callback - def async_external_step_done(self, *, next_step_id: str) -> Dict[str, Any]: + def async_external_step_done(self, *, next_step_id: str) -> dict[str, Any]: """Return the definition of an external step for the user to take.""" return { "type": RESULT_TYPE_EXTERNAL_STEP_DONE, @@ -375,8 +375,8 @@ class FlowHandler: *, step_id: str, progress_action: str, - description_placeholders: Optional[Dict] = None, - ) -> Dict[str, Any]: + description_placeholders: dict | None = None, + ) -> dict[str, Any]: """Show a progress message to the user, without user input allowed.""" return { "type": RESULT_TYPE_SHOW_PROGRESS, @@ -388,7 +388,7 @@ class FlowHandler: } @callback - def async_show_progress_done(self, *, next_step_id: str) -> Dict[str, Any]: + def async_show_progress_done(self, *, next_step_id: str) -> dict[str, Any]: """Mark the progress done.""" return { "type": RESULT_TYPE_SHOW_PROGRESS_DONE, @@ -403,8 +403,8 @@ def _create_abort_data( flow_id: str, handler: str, reason: str, - description_placeholders: Optional[Dict] = None, -) -> Dict[str, Any]: + description_placeholders: dict | None = None, +) -> dict[str, Any]: """Return the definition of an external step for the user to take.""" return { "type": RESULT_TYPE_ABORT, diff --git a/homeassistant/exceptions.py b/homeassistant/exceptions.py index 9a7b3da0f1d..499d4052849 100644 --- a/homeassistant/exceptions.py +++ b/homeassistant/exceptions.py @@ -1,5 +1,7 @@ """The exceptions used by Home Assistant.""" -from typing import TYPE_CHECKING, Generator, Optional, Sequence +from __future__ import annotations + +from typing import TYPE_CHECKING, Generator, Sequence import attr @@ -113,12 +115,12 @@ class Unauthorized(HomeAssistantError): def __init__( self, - context: Optional["Context"] = None, - user_id: Optional[str] = None, - entity_id: Optional[str] = None, - config_entry_id: Optional[str] = None, - perm_category: Optional[str] = None, - permission: Optional[str] = None, + context: "Context" | None = None, + user_id: str | None = None, + entity_id: str | None = None, + config_entry_id: str | None = None, + perm_category: str | None = None, + permission: str | None = None, ) -> None: """Unauthorized error.""" super().__init__(self.__class__.__name__) diff --git a/homeassistant/loader.py b/homeassistant/loader.py index 2ae279da79e..f6cb24698ec 100644 --- a/homeassistant/loader.py +++ b/homeassistant/loader.py @@ -14,19 +14,7 @@ import logging import pathlib import sys from types import ModuleType -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - List, - Optional, - Set, - TypedDict, - TypeVar, - Union, - cast, -) +from typing import TYPE_CHECKING, Any, Callable, Dict, TypedDict, TypeVar, cast from awesomeversion import AwesomeVersion, AwesomeVersionStrategy @@ -86,21 +74,21 @@ class Manifest(TypedDict, total=False): name: str disabled: str domain: str - dependencies: List[str] - after_dependencies: List[str] - requirements: List[str] + dependencies: list[str] + after_dependencies: list[str] + requirements: list[str] config_flow: bool documentation: str issue_tracker: str quality_scale: str - mqtt: List[str] - ssdp: List[Dict[str, str]] - zeroconf: List[Union[str, Dict[str, str]]] - dhcp: List[Dict[str, str]] - homekit: Dict[str, List[str]] + mqtt: list[str] + ssdp: list[dict[str, str]] + zeroconf: list[str | dict[str, str]] + dhcp: list[dict[str, str]] + homekit: dict[str, list[str]] is_built_in: bool version: str - codeowners: List[str] + codeowners: list[str] def manifest_from_legacy_module(domain: str, module: ModuleType) -> Manifest: @@ -116,7 +104,7 @@ def manifest_from_legacy_module(domain: str, module: ModuleType) -> Manifest: async def _async_get_custom_components( hass: "HomeAssistant", -) -> Dict[str, Integration]: +) -> dict[str, Integration]: """Return list of custom integrations.""" if hass.config.safe_mode: return {} @@ -126,7 +114,7 @@ async def _async_get_custom_components( except ImportError: return {} - def get_sub_directories(paths: List[str]) -> List[pathlib.Path]: + def get_sub_directories(paths: list[str]) -> list[pathlib.Path]: """Return all sub directories in a set of paths.""" return [ entry @@ -157,7 +145,7 @@ async def _async_get_custom_components( async def async_get_custom_components( hass: "HomeAssistant", -) -> Dict[str, Integration]: +) -> dict[str, Integration]: """Return cached list of custom integrations.""" reg_or_evt = hass.data.get(DATA_CUSTOM_COMPONENTS) @@ -177,12 +165,12 @@ async def async_get_custom_components( return cast(Dict[str, "Integration"], reg_or_evt) -async def async_get_config_flows(hass: HomeAssistant) -> Set[str]: +async def async_get_config_flows(hass: HomeAssistant) -> set[str]: """Return cached list of config flows.""" # pylint: disable=import-outside-toplevel from homeassistant.generated.config_flows import FLOWS - flows: Set[str] = set() + flows: set[str] = set() flows.update(FLOWS) integrations = await async_get_custom_components(hass) @@ -197,9 +185,9 @@ async def async_get_config_flows(hass: HomeAssistant) -> Set[str]: return flows -async def async_get_zeroconf(hass: HomeAssistant) -> Dict[str, List[Dict[str, str]]]: +async def async_get_zeroconf(hass: HomeAssistant) -> dict[str, list[dict[str, str]]]: """Return cached list of zeroconf types.""" - zeroconf: Dict[str, List[Dict[str, str]]] = ZEROCONF.copy() + zeroconf: dict[str, list[dict[str, str]]] = ZEROCONF.copy() integrations = await async_get_custom_components(hass) for integration in integrations.values(): @@ -220,9 +208,9 @@ async def async_get_zeroconf(hass: HomeAssistant) -> Dict[str, List[Dict[str, st return zeroconf -async def async_get_dhcp(hass: HomeAssistant) -> List[Dict[str, str]]: +async def async_get_dhcp(hass: HomeAssistant) -> list[dict[str, str]]: """Return cached list of dhcp types.""" - dhcp: List[Dict[str, str]] = DHCP.copy() + dhcp: list[dict[str, str]] = DHCP.copy() integrations = await async_get_custom_components(hass) for integration in integrations.values(): @@ -234,10 +222,10 @@ async def async_get_dhcp(hass: HomeAssistant) -> List[Dict[str, str]]: return dhcp -async def async_get_homekit(hass: HomeAssistant) -> Dict[str, str]: +async def async_get_homekit(hass: HomeAssistant) -> dict[str, str]: """Return cached list of homekit models.""" - homekit: Dict[str, str] = HOMEKIT.copy() + homekit: dict[str, str] = HOMEKIT.copy() integrations = await async_get_custom_components(hass) for integration in integrations.values(): @@ -253,10 +241,10 @@ async def async_get_homekit(hass: HomeAssistant) -> Dict[str, str]: return homekit -async def async_get_ssdp(hass: HomeAssistant) -> Dict[str, List[Dict[str, str]]]: +async def async_get_ssdp(hass: HomeAssistant) -> dict[str, list[dict[str, str]]]: """Return cached list of ssdp mappings.""" - ssdp: Dict[str, List[Dict[str, str]]] = SSDP.copy() + ssdp: dict[str, list[dict[str, str]]] = SSDP.copy() integrations = await async_get_custom_components(hass) for integration in integrations.values(): @@ -268,10 +256,10 @@ async def async_get_ssdp(hass: HomeAssistant) -> Dict[str, List[Dict[str, str]]] return ssdp -async def async_get_mqtt(hass: HomeAssistant) -> Dict[str, List[str]]: +async def async_get_mqtt(hass: HomeAssistant) -> dict[str, list[str]]: """Return cached list of MQTT mappings.""" - mqtt: Dict[str, List[str]] = MQTT.copy() + mqtt: dict[str, list[str]] = MQTT.copy() integrations = await async_get_custom_components(hass) for integration in integrations.values(): @@ -289,7 +277,7 @@ class Integration: @classmethod def resolve_from_root( cls, hass: "HomeAssistant", root_module: ModuleType, domain: str - ) -> Optional[Integration]: + ) -> Integration | None: """Resolve an integration from a root module.""" for base in root_module.__path__: # type: ignore manifest_path = pathlib.Path(base) / domain / "manifest.json" @@ -312,9 +300,7 @@ class Integration: return None @classmethod - def resolve_legacy( - cls, hass: "HomeAssistant", domain: str - ) -> Optional[Integration]: + def resolve_legacy(cls, hass: "HomeAssistant", domain: str) -> Integration | None: """Resolve legacy component. Will create a stub manifest. @@ -346,8 +332,8 @@ class Integration: manifest["is_built_in"] = self.is_built_in if self.dependencies: - self._all_dependencies_resolved: Optional[bool] = None - self._all_dependencies: Optional[Set[str]] = None + self._all_dependencies_resolved: bool | None = None + self._all_dependencies: set[str] | None = None else: self._all_dependencies_resolved = True self._all_dependencies = set() @@ -360,7 +346,7 @@ class Integration: return self.manifest["name"] @property - def disabled(self) -> Optional[str]: + def disabled(self) -> str | None: """Return reason integration is disabled.""" return self.manifest.get("disabled") @@ -370,17 +356,17 @@ class Integration: return self.manifest["domain"] @property - def dependencies(self) -> List[str]: + def dependencies(self) -> list[str]: """Return dependencies.""" return self.manifest.get("dependencies", []) @property - def after_dependencies(self) -> List[str]: + def after_dependencies(self) -> list[str]: """Return after_dependencies.""" return self.manifest.get("after_dependencies", []) @property - def requirements(self) -> List[str]: + def requirements(self) -> list[str]: """Return requirements.""" return self.manifest.get("requirements", []) @@ -390,42 +376,42 @@ class Integration: return self.manifest.get("config_flow") or False @property - def documentation(self) -> Optional[str]: + def documentation(self) -> str | None: """Return documentation.""" return self.manifest.get("documentation") @property - def issue_tracker(self) -> Optional[str]: + def issue_tracker(self) -> str | None: """Return issue tracker link.""" return self.manifest.get("issue_tracker") @property - def quality_scale(self) -> Optional[str]: + def quality_scale(self) -> str | None: """Return Integration Quality Scale.""" return self.manifest.get("quality_scale") @property - def mqtt(self) -> Optional[List[str]]: + def mqtt(self) -> list[str] | None: """Return Integration MQTT entries.""" return self.manifest.get("mqtt") @property - def ssdp(self) -> Optional[List[Dict[str, str]]]: + def ssdp(self) -> list[dict[str, str]] | None: """Return Integration SSDP entries.""" return self.manifest.get("ssdp") @property - def zeroconf(self) -> Optional[List[Union[str, Dict[str, str]]]]: + def zeroconf(self) -> list[str | dict[str, str]] | None: """Return Integration zeroconf entries.""" return self.manifest.get("zeroconf") @property - def dhcp(self) -> Optional[List[Dict[str, str]]]: + def dhcp(self) -> list[dict[str, str]] | None: """Return Integration dhcp entries.""" return self.manifest.get("dhcp") @property - def homekit(self) -> Optional[Dict[str, List[str]]]: + def homekit(self) -> dict[str, list[str]] | None: """Return Integration homekit entries.""" return self.manifest.get("homekit") @@ -435,14 +421,14 @@ class Integration: return self.pkg_path.startswith(PACKAGE_BUILTIN) @property - def version(self) -> Optional[AwesomeVersion]: + def version(self) -> AwesomeVersion | None: """Return the version of the integration.""" if "version" not in self.manifest: return None return AwesomeVersion(self.manifest["version"]) @property - def all_dependencies(self) -> Set[str]: + def all_dependencies(self) -> set[str]: """Return all dependencies including sub-dependencies.""" if self._all_dependencies is None: raise RuntimeError("Dependencies not resolved!") @@ -516,7 +502,7 @@ async def async_get_integration(hass: "HomeAssistant", domain: str) -> Integrati raise IntegrationNotFound(domain) cache = hass.data[DATA_INTEGRATIONS] = {} - int_or_evt: Union[Integration, asyncio.Event, None] = cache.get(domain, _UNDEF) + int_or_evt: Integration | asyncio.Event | None = cache.get(domain, _UNDEF) if isinstance(int_or_evt, asyncio.Event): await int_or_evt.wait() @@ -593,8 +579,8 @@ class CircularDependency(LoaderError): def _load_file( - hass: "HomeAssistant", comp_or_platform: str, base_paths: List[str] -) -> Optional[ModuleType]: + hass: "HomeAssistant", comp_or_platform: str, base_paths: list[str] +) -> ModuleType | None: """Try to load specified file. Looks in config dir first, then built-in components. @@ -683,7 +669,7 @@ class Components: integration = self._hass.data.get(DATA_INTEGRATIONS, {}).get(comp_name) if isinstance(integration, Integration): - component: Optional[ModuleType] = integration.get_component() + component: ModuleType | None = integration.get_component() else: # Fallback to importing old-school component = _load_file(self._hass, comp_name, _lookup_path(self._hass)) @@ -721,9 +707,9 @@ async def _async_component_dependencies( hass: "HomeAssistant", start_domain: str, integration: Integration, - loaded: Set[str], - loading: Set[str], -) -> Set[str]: + loaded: set[str], + loading: set[str], +) -> set[str]: """Recursive function to get component dependencies. Async friendly. @@ -773,7 +759,7 @@ def _async_mount_config_dir(hass: HomeAssistant) -> bool: return True -def _lookup_path(hass: HomeAssistant) -> List[str]: +def _lookup_path(hass: HomeAssistant) -> list[str]: """Return the lookup paths for legacy lookups.""" if hass.config.safe_mode: return [PACKAGE_BUILTIN] diff --git a/homeassistant/requirements.py b/homeassistant/requirements.py index 6ea2074ddf4..f073fd13df8 100644 --- a/homeassistant/requirements.py +++ b/homeassistant/requirements.py @@ -1,7 +1,9 @@ """Module to handle installing requirements.""" +from __future__ import annotations + import asyncio import os -from typing import Any, Dict, Iterable, List, Optional, Set, Union, cast +from typing import Any, Iterable, cast from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError @@ -15,7 +17,7 @@ DATA_PIP_LOCK = "pip_lock" DATA_PKG_CACHE = "pkg_cache" DATA_INTEGRATIONS_WITH_REQS = "integrations_with_reqs" CONSTRAINT_FILE = "package_constraints.txt" -DISCOVERY_INTEGRATIONS: Dict[str, Iterable[str]] = { +DISCOVERY_INTEGRATIONS: dict[str, Iterable[str]] = { "dhcp": ("dhcp",), "mqtt": ("mqtt",), "ssdp": ("ssdp",), @@ -26,7 +28,7 @@ DISCOVERY_INTEGRATIONS: Dict[str, Iterable[str]] = { class RequirementsNotFound(HomeAssistantError): """Raised when a component is not found.""" - def __init__(self, domain: str, requirements: List[str]) -> None: + def __init__(self, domain: str, requirements: list[str]) -> None: """Initialize a component not found error.""" super().__init__(f"Requirements for {domain} not found: {requirements}.") self.domain = domain @@ -34,7 +36,7 @@ class RequirementsNotFound(HomeAssistantError): async def async_get_integration_with_requirements( - hass: HomeAssistant, domain: str, done: Optional[Set[str]] = None + hass: HomeAssistant, domain: str, done: set[str] | None = None ) -> Integration: """Get an integration with all requirements installed, including the dependencies. @@ -56,7 +58,7 @@ async def async_get_integration_with_requirements( if cache is None: cache = hass.data[DATA_INTEGRATIONS_WITH_REQS] = {} - int_or_evt: Union[Integration, asyncio.Event, None, UndefinedType] = cache.get( + int_or_evt: Integration | asyncio.Event | None | UndefinedType = cache.get( domain, UNDEFINED ) @@ -108,7 +110,7 @@ async def async_get_integration_with_requirements( async def async_process_requirements( - hass: HomeAssistant, name: str, requirements: List[str] + hass: HomeAssistant, name: str, requirements: list[str] ) -> None: """Install the requirements for a component or platform. @@ -126,7 +128,7 @@ async def async_process_requirements( if pkg_util.is_installed(req): continue - def _install(req: str, kwargs: Dict[str, Any]) -> bool: + def _install(req: str, kwargs: dict[str, Any]) -> bool: """Install requirement.""" return pkg_util.install_package(req, **kwargs) @@ -136,7 +138,7 @@ async def async_process_requirements( raise RequirementsNotFound(name, [req]) -def pip_kwargs(config_dir: Optional[str]) -> Dict[str, Any]: +def pip_kwargs(config_dir: str | None) -> dict[str, Any]: """Return keyword arguments for PIP install.""" is_docker = pkg_util.is_docker_env() kwargs = { diff --git a/homeassistant/runner.py b/homeassistant/runner.py index 6f13a47be81..5adddb5f6ef 100644 --- a/homeassistant/runner.py +++ b/homeassistant/runner.py @@ -1,9 +1,11 @@ """Run Home Assistant.""" +from __future__ import annotations + import asyncio from concurrent.futures import ThreadPoolExecutor import dataclasses import logging -from typing import Any, Dict, Optional +from typing import Any from homeassistant import bootstrap from homeassistant.core import callback @@ -34,8 +36,8 @@ class RuntimeConfig: verbose: bool = False - log_rotate_days: Optional[int] = None - log_file: Optional[str] = None + log_rotate_days: int | None = None + log_file: str | None = None log_no_color: bool = False debug: bool = False @@ -83,7 +85,7 @@ class HassEventLoopPolicy(asyncio.DefaultEventLoopPolicy): # type: ignore[valid @callback -def _async_loop_exception_handler(_: Any, context: Dict[str, Any]) -> None: +def _async_loop_exception_handler(_: Any, context: dict[str, Any]) -> None: """Handle all exception inside the core loop.""" kwargs = {} exception = context.get("exception") diff --git a/homeassistant/setup.py b/homeassistant/setup.py index 4060b0410d6..4c5e10a254b 100644 --- a/homeassistant/setup.py +++ b/homeassistant/setup.py @@ -1,9 +1,11 @@ """All methods needed to bootstrap a Home Assistant instance.""" +from __future__ import annotations + import asyncio import logging.handlers from timeit import default_timer as timer from types import ModuleType -from typing import Awaitable, Callable, Optional, Set +from typing import Awaitable, Callable from homeassistant import config as conf_util, core, loader, requirements from homeassistant.config import async_notify_setup_error @@ -26,7 +28,7 @@ SLOW_SETUP_MAX_WAIT = 300 @core.callback -def async_set_domains_to_be_loaded(hass: core.HomeAssistant, domains: Set[str]) -> None: +def async_set_domains_to_be_loaded(hass: core.HomeAssistant, domains: set[str]) -> None: """Set domains that are going to be loaded from the config. This will allow us to properly handle after_dependencies. @@ -133,7 +135,7 @@ async def _async_setup_component( This method is a coroutine. """ - def log_error(msg: str, link: Optional[str] = None) -> None: + def log_error(msg: str, link: str | None = None) -> None: """Log helper.""" _LOGGER.error("Setup failed for %s: %s", domain, msg) async_notify_setup_error(hass, domain, link) @@ -268,7 +270,7 @@ async def _async_setup_component( async def async_prepare_setup_platform( hass: core.HomeAssistant, hass_config: ConfigType, domain: str, platform_name: str -) -> Optional[ModuleType]: +) -> ModuleType | None: """Load a platform and makes sure dependencies are setup. This method is a coroutine.