Update pylint to 2.8.0 (#49637)

This commit is contained in:
Marc Mueller 2021-04-25 02:39:24 +02:00 committed by GitHub
parent 28eaa67986
commit f1d48ddfe3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 114 additions and 114 deletions

View file

@ -145,6 +145,7 @@ def daemonize() -> None:
sys.exit(0) sys.exit(0)
# redirect standard file descriptors to devnull # redirect standard file descriptors to devnull
# pylint: disable=consider-using-with
infd = open(os.devnull) infd = open(os.devnull)
outfd = open(os.devnull, "a+") outfd = open(os.devnull, "a+")
sys.stdout.flush() sys.stdout.flush()

View file

@ -1374,10 +1374,7 @@ async def async_api_seek(hass, config, directive, context):
msg = f"{entity} did not return the current media position." msg = f"{entity} did not return the current media position."
raise AlexaVideoActionNotPermittedForContentError(msg) raise AlexaVideoActionNotPermittedForContentError(msg)
seek_position = int(current_position) + int(position_delta / 1000) seek_position = max(int(current_position) + int(position_delta / 1000), 0)
if seek_position < 0:
seek_position = 0
media_duration = entity.attributes.get(media_player.ATTR_MEDIA_DURATION) media_duration = entity.attributes.get(media_player.ATTR_MEDIA_DURATION)
if media_duration and 0 < int(media_duration) < seek_position: if media_duration and 0 < int(media_duration) < seek_position:

View file

@ -7,6 +7,7 @@ import voluptuous as vol
from homeassistant.components.notify import PLATFORM_SCHEMA, BaseNotificationService from homeassistant.components.notify import PLATFORM_SCHEMA, BaseNotificationService
from homeassistant.const import CONF_COMMAND, CONF_NAME from homeassistant.const import CONF_COMMAND, CONF_NAME
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.util.process import kill_subprocess
from .const import CONF_COMMAND_TIMEOUT, DEFAULT_TIMEOUT from .const import CONF_COMMAND_TIMEOUT, DEFAULT_TIMEOUT
@ -39,17 +40,18 @@ class CommandLineNotificationService(BaseNotificationService):
def send_message(self, message="", **kwargs): def send_message(self, message="", **kwargs):
"""Send a message to a command line.""" """Send a message to a command line."""
try: with subprocess.Popen(
proc = subprocess.Popen( self.command,
self.command, universal_newlines=True,
universal_newlines=True, stdin=subprocess.PIPE,
stdin=subprocess.PIPE, shell=True, # nosec # shell by design
shell=True, # nosec # shell by design ) as proc:
) try:
proc.communicate(input=message, timeout=self._timeout) proc.communicate(input=message, timeout=self._timeout)
if proc.returncode != 0: if proc.returncode != 0:
_LOGGER.error("Command failed: %s", self.command) _LOGGER.error("Command failed: %s", self.command)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
_LOGGER.error("Timeout for command: %s", self.command) _LOGGER.error("Timeout for command: %s", self.command)
except subprocess.SubprocessError: kill_subprocess(proc)
_LOGGER.error("Error trying to exec command: %s", self.command) except subprocess.SubprocessError:
_LOGGER.error("Error trying to exec command: %s", self.command)

View file

@ -153,7 +153,7 @@ class DenonDevice(MediaPlayerEntity):
) )
self._available = True self._available = True
def async_log_errors( # pylint: disable=no-self-argument def async_log_errors(
func: Coroutine, func: Coroutine,
) -> Coroutine: ) -> Coroutine:
""" """
@ -168,7 +168,7 @@ class DenonDevice(MediaPlayerEntity):
# pylint: disable=protected-access # pylint: disable=protected-access
available = True available = True
try: try:
return await func(self, *args, **kwargs) # pylint: disable=not-callable return await func(self, *args, **kwargs)
except AvrTimoutError: except AvrTimoutError:
available = False available = False
if self._available is True: if self._available is True:
@ -203,7 +203,7 @@ class DenonDevice(MediaPlayerEntity):
_LOGGER.error( _LOGGER.error(
"Error %s occurred in method %s for Denon AVR receiver", "Error %s occurred in method %s for Denon AVR receiver",
err, err,
func.__name__, # pylint: disable=no-member func.__name__,
exc_info=True, exc_info=True,
) )
finally: finally:

View file

@ -7,12 +7,7 @@ from homeassistant.core import callback
from homeassistant.helpers.typing import DiscoveryInfoType from homeassistant.helpers.typing import DiscoveryInfoType
from . import configure_mydevolo from . import configure_mydevolo
from .const import ( # pylint:disable=unused-import from .const import CONF_MYDEVOLO, DEFAULT_MYDEVOLO, DOMAIN, SUPPORTED_MODEL_TYPES
CONF_MYDEVOLO,
DEFAULT_MYDEVOLO,
DOMAIN,
SUPPORTED_MODEL_TYPES,
)
from .exceptions import CredentialsInvalid from .exceptions import CredentialsInvalid

View file

@ -289,6 +289,7 @@ class HangoutsBot:
uri = data.get("image_file") uri = data.get("image_file")
if self.hass.config.is_allowed_path(uri): if self.hass.config.is_allowed_path(uri):
try: try:
# pylint: disable=consider-using-with
image_file = open(uri, "rb") image_file = open(uri, "rb")
except OSError as error: except OSError as error:
_LOGGER.error( _LOGGER.error(

View file

@ -6,7 +6,6 @@ from homeassistant import config_entries
from homeassistant.const import CONF_ELEVATION, CONF_LATITUDE, CONF_LONGITUDE, CONF_NAME from homeassistant.const import CONF_ELEVATION, CONF_LATITUDE, CONF_LONGITUDE, CONF_NAME
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
# pylint:disable=unused-import
from .const import DOMAIN, HOME_LOCATION_NAME from .const import DOMAIN, HOME_LOCATION_NAME

View file

@ -21,7 +21,7 @@ from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
from . import create_motioneye_client from . import create_motioneye_client
from .const import ( # pylint:disable=unused-import from .const import (
CONF_ADMIN_PASSWORD, CONF_ADMIN_PASSWORD,
CONF_ADMIN_USERNAME, CONF_ADMIN_USERNAME,
CONF_CONFIG_ENTRY, CONF_CONFIG_ENTRY,

View file

@ -266,7 +266,7 @@ class NFAndroidTVNotificationService(BaseNotificationService):
if local_path is not None: if local_path is not None:
# Check whether path is whitelisted in configuration.yaml # Check whether path is whitelisted in configuration.yaml
if self.is_allowed_path(local_path): if self.is_allowed_path(local_path):
return open(local_path, "rb") return open(local_path, "rb") # pylint: disable=consider-using-with
_LOGGER.warning("'%s' is not secure to load data from!", local_path) _LOGGER.warning("'%s' is not secure to load data from!", local_path)
else: else:
_LOGGER.warning("Neither URL nor local path found in params!") _LOGGER.warning("Neither URL nor local path found in params!")

View file

@ -183,7 +183,6 @@ class OpenAlprLocalEntity(ImageProcessingAlprEntity):
alpr = await asyncio.create_subprocess_exec( alpr = await asyncio.create_subprocess_exec(
*self._cmd, *self._cmd,
loop=self.hass.loop,
stdin=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL,

View file

@ -1,6 +1,7 @@
"""Config flow for Picnic integration.""" """Config flow for Picnic integration."""
from __future__ import annotations
import logging import logging
from typing import Tuple
from python_picnic_api import PicnicAPI from python_picnic_api import PicnicAPI
from python_picnic_api.session import PicnicAuthError from python_picnic_api.session import PicnicAuthError
@ -10,11 +11,7 @@ import voluptuous as vol
from homeassistant import config_entries, core, exceptions from homeassistant import config_entries, core, exceptions
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_PASSWORD, CONF_USERNAME from homeassistant.const import CONF_ACCESS_TOKEN, CONF_PASSWORD, CONF_USERNAME
from .const import ( # pylint: disable=unused-import from .const import CONF_COUNTRY_CODE, COUNTRY_CODES, DOMAIN
CONF_COUNTRY_CODE,
COUNTRY_CODES,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -33,7 +30,7 @@ class PicnicHub:
"""Hub class to test user authentication.""" """Hub class to test user authentication."""
@staticmethod @staticmethod
def authenticate(username, password, country_code) -> Tuple[str, dict]: def authenticate(username, password, country_code) -> tuple[str, dict]:
"""Test if we can authenticate with the Picnic API.""" """Test if we can authenticate with the Picnic API."""
picnic = PicnicAPI(username, password, country_code) picnic = PicnicAPI(username, password, country_code)
return picnic.session.auth_token, picnic.get_user() return picnic.session.auth_token, picnic.get_user()

View file

@ -1,6 +1,7 @@
"""Definition of Picnic sensors.""" """Definition of Picnic sensors."""
from __future__ import annotations
from typing import Any, Optional from typing import Any
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_ATTRIBUTION from homeassistant.const import ATTR_ATTRIBUTION
@ -48,17 +49,17 @@ class PicnicSensor(CoordinatorEntity):
self._service_unique_id = config_entry.unique_id self._service_unique_id = config_entry.unique_id
@property @property
def unit_of_measurement(self) -> Optional[str]: def unit_of_measurement(self) -> str | None:
"""Return the unit this state is expressed in.""" """Return the unit this state is expressed in."""
return self.properties.get("unit") return self.properties.get("unit")
@property @property
def unique_id(self) -> Optional[str]: def unique_id(self) -> str | None:
"""Return a unique ID.""" """Return a unique ID."""
return f"{self._service_unique_id}.{self.sensor_type}" return f"{self._service_unique_id}.{self.sensor_type}"
@property @property
def name(self) -> Optional[str]: def name(self) -> str | None:
"""Return the name of the entity.""" """Return the name of the entity."""
return self._to_capitalized_name(self.sensor_type) return self._to_capitalized_name(self.sensor_type)
@ -69,12 +70,12 @@ class PicnicSensor(CoordinatorEntity):
return self.properties["state"](data_set) return self.properties["state"](data_set)
@property @property
def device_class(self) -> Optional[str]: def device_class(self) -> str | None:
"""Return the class of this device, from component DEVICE_CLASSES.""" """Return the class of this device, from component DEVICE_CLASSES."""
return self.properties.get("class") return self.properties.get("class")
@property @property
def icon(self) -> Optional[str]: def icon(self) -> str | None:
"""Return the icon to use in the frontend, if any.""" """Return the icon to use in the frontend, if any."""
return self.properties["icon"] return self.properties["icon"]

View file

@ -54,18 +54,18 @@ class HostSubProcess:
def ping(self): def ping(self):
"""Send an ICMP echo request and return True if success.""" """Send an ICMP echo request and return True if success."""
pinger = subprocess.Popen( with subprocess.Popen(
self._ping_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL self._ping_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
) ) as pinger:
try: try:
pinger.communicate(timeout=1 + PING_TIMEOUT) pinger.communicate(timeout=1 + PING_TIMEOUT)
return pinger.returncode == 0 return pinger.returncode == 0
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
kill_subprocess(pinger) kill_subprocess(pinger)
return False return False
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return False return False
def update(self) -> bool: def update(self) -> bool:
"""Update device state by sending one or more ping messages.""" """Update device state by sending one or more ping messages."""

View file

@ -75,6 +75,7 @@ class PushoverNotificationService(BaseNotificationService):
if self._hass.config.is_allowed_path(data[ATTR_ATTACHMENT]): if self._hass.config.is_allowed_path(data[ATTR_ATTACHMENT]):
# try to open it as a normal file. # try to open it as a normal file.
try: try:
# pylint: disable=consider-using-with
file_handle = open(data[ATTR_ATTACHMENT], "rb") file_handle = open(data[ATTR_ATTACHMENT], "rb")
# Replace the attachment identifier with file object. # Replace the attachment identifier with file object.
image = file_handle image = file_handle

View file

@ -56,9 +56,8 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
# If no file path is defined, use a temporary file # If no file path is defined, use a temporary file
if file_path is None: if file_path is None:
temp_file = NamedTemporaryFile(suffix=".jpg", delete=False) with NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
temp_file.close() file_path = temp_file.name
file_path = temp_file.name
setup_config[CONF_FILE_PATH] = file_path setup_config[CONF_FILE_PATH] = file_path
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, delete_temp_file) hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, delete_temp_file)

View file

@ -124,14 +124,14 @@ class ImageProcessingSsocr(ImageProcessingEntity):
img = Image.open(stream) img = Image.open(stream)
img.save(self.filepath, "png") img.save(self.filepath, "png")
ocr = subprocess.Popen( with subprocess.Popen(
self._command, stdout=subprocess.PIPE, stderr=subprocess.PIPE self._command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
) ) as ocr:
out = ocr.communicate() out = ocr.communicate()
if out[0] != b"": if out[0] != b"":
self._state = out[0].strip().decode("utf-8") self._state = out[0].strip().decode("utf-8")
else: else:
self._state = None self._state = None
_LOGGER.warning( _LOGGER.warning(
"Unable to detect value: %s", out[1].strip().decode("utf-8") "Unable to detect value: %s", out[1].strip().decode("utf-8")
) )

View file

@ -282,7 +282,7 @@ def load_data(
_LOGGER.warning("Can't load data in %s after %s retries", url, retry_num) _LOGGER.warning("Can't load data in %s after %s retries", url, retry_num)
elif filepath is not None: elif filepath is not None:
if hass.config.is_allowed_path(filepath): if hass.config.is_allowed_path(filepath):
return open(filepath, "rb") return open(filepath, "rb") # pylint: disable=consider-using-with
_LOGGER.warning("'%s' are not secure to load data from!", filepath) _LOGGER.warning("'%s' are not secure to load data from!", filepath)
else: else:

View file

@ -185,8 +185,7 @@ class TradfriLight(TradfriBaseDevice, LightEntity):
dimmer_command = None dimmer_command = None
if ATTR_BRIGHTNESS in kwargs: if ATTR_BRIGHTNESS in kwargs:
brightness = kwargs[ATTR_BRIGHTNESS] brightness = kwargs[ATTR_BRIGHTNESS]
if brightness > 254: brightness = min(brightness, 254)
brightness = 254
dimmer_data = { dimmer_data = {
ATTR_DIMMER: brightness, ATTR_DIMMER: brightness,
ATTR_TRANSITION_TIME: transition_time, ATTR_TRANSITION_TIME: transition_time,

View file

@ -1,6 +1,7 @@
"""Support for exposing Home Assistant via Zeroconf.""" """Support for exposing Home Assistant via Zeroconf."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Iterable
from contextlib import suppress from contextlib import suppress
import fnmatch import fnmatch
from functools import partial from functools import partial
@ -8,7 +9,7 @@ import ipaddress
from ipaddress import ip_address from ipaddress import ip_address
import logging import logging
import socket import socket
from typing import Any, Iterable, TypedDict, cast from typing import Any, TypedDict, cast
from pyroute2 import IPRoute from pyroute2 import IPRoute
import voluptuous as vol import voluptuous as vol

View file

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any, Dict, cast from typing import Any, cast
import voluptuous as vol import voluptuous as vol
@ -163,7 +163,7 @@ class ZoneStorageCollection(collection.StorageCollection):
async def _process_create_data(self, data: dict) -> dict: async def _process_create_data(self, data: dict) -> dict:
"""Validate the config is valid.""" """Validate the config is valid."""
return cast(Dict, self.CREATE_SCHEMA(data)) return cast(dict, self.CREATE_SCHEMA(data))
@callback @callback
def _get_suggested_id(self, info: dict) -> str: def _get_suggested_id(self, info: dict) -> str:
@ -291,7 +291,7 @@ class Zone(entity.Entity):
"""Return entity instance initialized from yaml storage.""" """Return entity instance initialized from yaml storage."""
zone = cls(config) zone = cls(config)
zone.editable = False zone.editable = False
zone._generate_attrs() # pylint:disable=protected-access zone._generate_attrs()
return zone return zone
@property @property

View file

@ -7,7 +7,7 @@ of entities and react to changes.
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Awaitable, Collection, Iterable, Mapping from collections.abc import Awaitable, Collection, Coroutine, Iterable, Mapping
import datetime import datetime
import enum import enum
import functools import functools
@ -18,7 +18,7 @@ import re
import threading import threading
from time import monotonic from time import monotonic
from types import MappingProxyType from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional, TypeVar, cast from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar, cast
import attr import attr
import voluptuous as vol import voluptuous as vol

View file

@ -483,7 +483,7 @@ def schema_with_slug_keys(
for key in value.keys(): for key in value.keys():
slug_validator(key) slug_validator(key)
return cast(Dict, schema(value)) return cast(dict, schema(value))
return verify return verify

View file

@ -1,7 +1,7 @@
"""Selectors for Home Assistant.""" """Selectors for Home Assistant."""
from __future__ import annotations from __future__ import annotations
from typing import Any, Callable, Dict, cast from typing import Any, Callable, cast
import voluptuous as vol import voluptuous as vol
@ -31,7 +31,7 @@ def validate_selector(config: Any) -> dict:
return {selector_type: {}} return {selector_type: {}}
return { return {
selector_type: cast(Dict, selector_class.CONFIG_SCHEMA(config[selector_type])) selector_type: cast(dict, selector_class.CONFIG_SCHEMA(config[selector_type]))
} }

View file

@ -216,7 +216,6 @@ class RenderInfo:
self.exception: TemplateError | None = None self.exception: TemplateError | None = None
self.all_states = False self.all_states = False
self.all_states_lifecycle = False self.all_states_lifecycle = False
# pylint: disable=unsubscriptable-object # for abc.Set, https://github.com/PyCQA/pylint/pull/4275
self.domains: collections.abc.Set[str] = set() self.domains: collections.abc.Set[str] = set()
self.domains_lifecycle: collections.abc.Set[str] = set() self.domains_lifecycle: collections.abc.Set[str] = set()
self.entities: collections.abc.Set[str] = set() self.entities: collections.abc.Set[str] = set()

View file

@ -81,7 +81,7 @@ async def async_get_integration_with_requirements(
try: try:
await _async_process_integration(hass, integration, done) await _async_process_integration(hass, integration, done)
except Exception: # pylint: disable=broad-except except Exception:
del cache[domain] del cache[domain]
event.set() event.set()
raise raise

View file

@ -90,15 +90,15 @@ def install_package(
# Workaround for incompatible prefix setting # Workaround for incompatible prefix setting
# See http://stackoverflow.com/a/4495175 # See http://stackoverflow.com/a/4495175
args += ["--prefix="] args += ["--prefix="]
process = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) as process:
_, stderr = process.communicate() _, stderr = process.communicate()
if process.returncode != 0: if process.returncode != 0:
_LOGGER.error( _LOGGER.error(
"Unable to install package %s: %s", "Unable to install package %s: %s",
package, package,
stderr.decode("utf-8").lstrip().strip(), stderr.decode("utf-8").lstrip().strip(),
) )
return False return False
return True return True

View file

@ -6,7 +6,7 @@ from contextlib import suppress
import logging import logging
import os import os
from os import O_CREAT, O_TRUNC, O_WRONLY, stat_result from os import O_CREAT, O_TRUNC, O_WRONLY, stat_result
from typing import Dict, List, Union from typing import Union
import ruamel.yaml import ruamel.yaml
from ruamel.yaml import YAML # type: ignore from ruamel.yaml import YAML # type: ignore
@ -19,7 +19,7 @@ from homeassistant.util.yaml import secret_yaml
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
JSON_TYPE = Union[List, Dict, str] # pylint: disable=invalid-name JSON_TYPE = Union[list, dict, str] # pylint: disable=invalid-name
class ExtSafeConstructor(SafeConstructor): class ExtSafeConstructor(SafeConstructor):

View file

@ -7,7 +7,7 @@ import fnmatch
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, TextIO, TypeVar, Union, overload from typing import Any, TextIO, TypeVar, Union, overload
import yaml import yaml
@ -18,8 +18,8 @@ from .objects import Input, NodeListClass, NodeStrClass
# mypy: allow-untyped-calls, no-warn-return-any # mypy: allow-untyped-calls, no-warn-return-any
JSON_TYPE = Union[List, Dict, str] # pylint: disable=invalid-name JSON_TYPE = Union[list, dict, str] # pylint: disable=invalid-name
DICT_T = TypeVar("DICT_T", bound=Dict) # pylint: disable=invalid-name DICT_T = TypeVar("DICT_T", bound=dict) # pylint: disable=invalid-name
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)

View file

@ -25,6 +25,7 @@ ignore = [
jobs = 2 jobs = 2
init-hook='from pylint.config.find_default_config_files import find_default_config_files; from pathlib import Path; import sys; sys.path.append(str(Path(Path(list(find_default_config_files())[0]).parent, "pylint/plugins")))' init-hook='from pylint.config.find_default_config_files import find_default_config_files; from pathlib import Path; import sys; sys.path.append(str(Path(Path(list(find_default_config_files())[0]).parent, "pylint/plugins")))'
load-plugins = [ load-plugins = [
"pylint.extensions.typing",
"pylint_strict_informational", "pylint_strict_informational",
"hass_logger" "hass_logger"
] ]
@ -109,6 +110,10 @@ overgeneral-exceptions = [
"HomeAssistantError", "HomeAssistantError",
] ]
[tool.pylint.TYPING]
py-version = "3.8"
runtime-typing = false
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = [ testpaths = [
"tests", "tests",

View file

@ -10,8 +10,8 @@ jsonpickle==1.4.1
mock-open==1.4.0 mock-open==1.4.0
mypy==0.812 mypy==0.812
pre-commit==2.12.1 pre-commit==2.12.1
pylint==2.7.4 pylint==2.8.0
astroid==2.5.2 astroid==2.5.5
pipdeptree==1.0.0 pipdeptree==1.0.0
pylint-strict-informational==0.1 pylint-strict-informational==0.1
pytest-aiohttp==0.3.0 pytest-aiohttp==0.3.0

View file

@ -94,21 +94,24 @@ async def test_subprocess_exceptions(caplog: Any, hass: HomeAssistant) -> None:
"""Test that notify subprocess exceptions are handled correctly.""" """Test that notify subprocess exceptions are handled correctly."""
with patch( with patch(
"homeassistant.components.command_line.notify.subprocess.Popen", "homeassistant.components.command_line.notify.subprocess.Popen"
side_effect=[
subprocess.TimeoutExpired("cmd", 10),
subprocess.SubprocessError(),
],
) as check_output: ) as check_output:
check_output.return_value.__enter__ = check_output
check_output.return_value.communicate.side_effect = [
subprocess.TimeoutExpired("cmd", 10),
None,
subprocess.SubprocessError(),
]
await setup_test_service(hass, {"command": "exit 0"}) await setup_test_service(hass, {"command": "exit 0"})
assert await hass.services.async_call( assert await hass.services.async_call(
DOMAIN, "test", {"message": "error"}, blocking=True DOMAIN, "test", {"message": "error"}, blocking=True
) )
assert check_output.call_count == 1 assert check_output.call_count == 2
assert "Timeout for command" in caplog.text assert "Timeout for command" in caplog.text
assert await hass.services.async_call( assert await hass.services.async_call(
DOMAIN, "test", {"message": "error"}, blocking=True DOMAIN, "test", {"message": "error"}, blocking=True
) )
assert check_output.call_count == 2 assert check_output.call_count == 4
assert "Error trying to exec command" in caplog.text assert "Error trying to exec command" in caplog.text

View file

@ -46,6 +46,7 @@ def lib_dir(deps_dir):
def mock_popen(lib_dir): def mock_popen(lib_dir):
"""Return a Popen mock.""" """Return a Popen mock."""
with patch("homeassistant.util.package.Popen") as popen_mock: with patch("homeassistant.util.package.Popen") as popen_mock:
popen_mock.return_value.__enter__ = popen_mock
popen_mock.return_value.communicate.return_value = ( popen_mock.return_value.communicate.return_value = (
bytes(lib_dir, "utf-8"), bytes(lib_dir, "utf-8"),
b"error", b"error",
@ -87,8 +88,8 @@ def test_install(mock_sys, mock_popen, mock_env_copy, mock_venv):
"""Test an install attempt on a package that doesn't exist.""" """Test an install attempt on a package that doesn't exist."""
env = mock_env_copy() env = mock_env_copy()
assert package.install_package(TEST_NEW_REQ, False) assert package.install_package(TEST_NEW_REQ, False)
assert mock_popen.call_count == 1 assert mock_popen.call_count == 2
assert mock_popen.call_args == call( assert mock_popen.mock_calls[0] == call(
[mock_sys.executable, "-m", "pip", "install", "--quiet", TEST_NEW_REQ], [mock_sys.executable, "-m", "pip", "install", "--quiet", TEST_NEW_REQ],
stdin=PIPE, stdin=PIPE,
stdout=PIPE, stdout=PIPE,
@ -102,8 +103,8 @@ def test_install_upgrade(mock_sys, mock_popen, mock_env_copy, mock_venv):
"""Test an upgrade attempt on a package.""" """Test an upgrade attempt on a package."""
env = mock_env_copy() env = mock_env_copy()
assert package.install_package(TEST_NEW_REQ) assert package.install_package(TEST_NEW_REQ)
assert mock_popen.call_count == 1 assert mock_popen.call_count == 2
assert mock_popen.call_args == call( assert mock_popen.mock_calls[0] == call(
[ [
mock_sys.executable, mock_sys.executable,
"-m", "-m",
@ -140,8 +141,8 @@ def test_install_target(mock_sys, mock_popen, mock_env_copy, mock_venv):
] ]
assert package.install_package(TEST_NEW_REQ, False, target=target) assert package.install_package(TEST_NEW_REQ, False, target=target)
assert mock_popen.call_count == 1 assert mock_popen.call_count == 2
assert mock_popen.call_args == call( assert mock_popen.mock_calls[0] == call(
args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env
) )
assert mock_popen.return_value.communicate.call_count == 1 assert mock_popen.return_value.communicate.call_count == 1
@ -169,8 +170,8 @@ def test_install_constraint(mock_sys, mock_popen, mock_env_copy, mock_venv):
env = mock_env_copy() env = mock_env_copy()
constraints = "constraints_file.txt" constraints = "constraints_file.txt"
assert package.install_package(TEST_NEW_REQ, False, constraints=constraints) assert package.install_package(TEST_NEW_REQ, False, constraints=constraints)
assert mock_popen.call_count == 1 assert mock_popen.call_count == 2
assert mock_popen.call_args == call( assert mock_popen.mock_calls[0] == call(
[ [
mock_sys.executable, mock_sys.executable,
"-m", "-m",
@ -194,8 +195,8 @@ def test_install_find_links(mock_sys, mock_popen, mock_env_copy, mock_venv):
env = mock_env_copy() env = mock_env_copy()
link = "https://wheels-repository" link = "https://wheels-repository"
assert package.install_package(TEST_NEW_REQ, False, find_links=link) assert package.install_package(TEST_NEW_REQ, False, find_links=link)
assert mock_popen.call_count == 1 assert mock_popen.call_count == 2
assert mock_popen.call_args == call( assert mock_popen.mock_calls[0] == call(
[ [
mock_sys.executable, mock_sys.executable,
"-m", "-m",