Helpers type hint improvements (#44964)

This commit is contained in:
Ville Skyttä 2021-01-09 01:08:34 +02:00 committed by GitHub
parent 3569d92385
commit 3a88a4120e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 45 deletions

View file

@ -219,7 +219,7 @@ class _ScriptRun:
self._stop = asyncio.Event()
self._stopped = asyncio.Event()
def _changed(self):
def _changed(self) -> None:
if not self._stop.is_set():
self._script._changed() # pylint: disable=protected-access
@ -227,7 +227,7 @@ class _ScriptRun:
# pylint: disable=protected-access
return await self._script._async_get_condition(config)
def _log(self, msg, *args, level=logging.INFO):
def _log(self, msg: str, *args: Any, level: int = logging.INFO) -> None:
self._script._log(msg, *args, level=level) # pylint: disable=protected-access
async def async_run(self) -> None:
@ -257,7 +257,7 @@ class _ScriptRun:
self._log_exception(ex)
raise
def _finish(self):
def _finish(self) -> None:
self._script._runs.remove(self) # pylint: disable=protected-access
if not self._script.is_running:
self._script.last_action = None
@ -389,7 +389,7 @@ class _ScriptRun:
async def _async_run_long_action(self, long_task):
"""Run a long task while monitoring for stop request."""
async def async_cancel_long_task():
async def async_cancel_long_task() -> None:
# Stop long task and wait for it to finish.
long_task.cancel()
try:
@ -586,7 +586,7 @@ class _ScriptRun:
else:
del self._variables["repeat"]
async def _async_choose_step(self):
async def _async_choose_step(self) -> None:
"""Choose a sequence."""
# pylint: disable=protected-access
choose_data = await self._script._async_get_choose_data(self._step)
@ -706,7 +706,7 @@ class _QueuedScriptRun(_ScriptRun):
else:
await super().async_run()
def _finish(self):
def _finish(self) -> None:
# pylint: disable=protected-access
if self.lock_acquired:
self._script._queue_lck.release()
@ -868,7 +868,7 @@ class Script:
if choose_data["default"]:
choose_data["default"].update_logger(self._logger)
def _changed(self):
def _changed(self) -> None:
if self._change_listener_job:
self._hass.async_run_hass_job(self._change_listener_job)
@ -898,7 +898,7 @@ class Script:
if self._referenced_devices is not None:
return self._referenced_devices
referenced = set()
referenced: Set[str] = set()
for step in self.sequence:
action = cv.determine_script_action(step)
@ -927,7 +927,7 @@ class Script:
if self._referenced_entities is not None:
return self._referenced_entities
referenced = set()
referenced: Set[str] = set()
for step in self.sequence:
action = cv.determine_script_action(step)
@ -1128,9 +1128,9 @@ class Script:
self._choose_data[step] = choose_data
return choose_data
def _log(self, msg, *args, level=logging.INFO):
def _log(self, msg: str, *args: Any, level: int = logging.INFO) -> None:
msg = f"%s: {msg}"
args = [self.name, *args]
args = (self.name, *args)
if level == _LOG_EXCEPTION:
self._logger.exception(msg, *args)