Use contextlib.suppress where possible (#48189)

This commit is contained in:
Franck Nijhof 2021-03-23 14:36:43 +01:00 committed by GitHub
parent 9656f260a4
commit 6932cf9534
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
86 changed files with 238 additions and 398 deletions

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
import aiopulse
import async_timeout
@ -37,13 +38,10 @@ class AcmedaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
}
hubs = []
try:
with async_timeout.timeout(5):
with suppress(asyncio.TimeoutError), async_timeout.timeout(5):
async for hub in aiopulse.Hub.discover():
if hub.id not in already_configured:
hubs.append(hub)
except asyncio.TimeoutError:
pass
if len(hubs) == 0:
return self.async_abort(reason="no_devices_found")

View file

@ -1,4 +1,5 @@
"""Support for Amcrest IP cameras."""
from contextlib import suppress
from datetime import timedelta
import logging
import threading
@ -191,10 +192,8 @@ class AmcrestChecker(Http):
def _wrap_test_online(self, now):
"""Test if camera is back online."""
_LOGGER.debug("Testing if %s back online", self._wrap_name)
try:
self.current_time
except AmcrestError:
pass
with suppress(AmcrestError):
self.current_time # pylint: disable=pointless-statement
def _monitor_events(hass, name, api, event_codes):

View file

@ -1,4 +1,5 @@
"""Support for Amcrest IP camera binary sensors."""
from contextlib import suppress
from datetime import timedelta
import logging
@ -154,10 +155,8 @@ class AmcrestBinarySensor(BinarySensorEntity):
# Send a command to the camera to test if we can still communicate with it.
# Override of Http.command() in __init__.py will set self._api.available
# accordingly.
try:
self._api.current_time
except AmcrestError:
pass
with suppress(AmcrestError):
self._api.current_time # pylint: disable=pointless-statement
self._state = self._api.available
def _update_others(self):

View file

@ -1,5 +1,6 @@
"""Rest API for Home Assistant."""
import asyncio
from contextlib import suppress
import json
import logging
@ -196,15 +197,11 @@ class APIDiscoveryView(HomeAssistantView):
ATTR_VERSION: __version__,
}
try:
with suppress(NoURLAvailableError):
data["external_url"] = get_url(hass, allow_internal=False)
except NoURLAvailableError:
pass
try:
with suppress(NoURLAvailableError):
data["internal_url"] = get_url(hass, allow_external=False)
except NoURLAvailableError:
pass
# Set old base URL based on external or internal
data["base_url"] = data["external_url"] or data["internal_url"]

View file

@ -1,4 +1,5 @@
"""APNS Notification platform."""
from contextlib import suppress
import logging
from apns2.client import APNsClient
@ -155,7 +156,7 @@ class ApnsNotificationService(BaseNotificationService):
self.device_states = {}
self.topic = topic
try:
with suppress(FileNotFoundError):
self.devices = {
str(key): ApnsDevice(
str(key),
@ -165,8 +166,6 @@ class ApnsNotificationService(BaseNotificationService):
)
for (key, value) in load_yaml_config_file(self.yaml_path).items()
}
except FileNotFoundError:
pass
tracking_ids = [
device.full_tracking_device_id

View file

@ -1,5 +1,6 @@
"""Arcam component."""
import asyncio
from contextlib import suppress
import logging
from arcam.fmj import ConnectionFailed
@ -28,10 +29,8 @@ CONFIG_SCHEMA = cv.deprecated(DOMAIN)
async def _await_cancel(task):
task.cancel()
try:
with suppress(asyncio.CancelledError):
await task
except asyncio.CancelledError:
pass
async def async_setup(hass: HomeAssistantType, config: ConfigType):

View file

@ -1,5 +1,6 @@
"""Config validation helper for the automation integration."""
import asyncio
from contextlib import suppress
import voluptuous as vol
@ -88,11 +89,8 @@ class AutomationConfig(dict):
async def _try_async_validate_config_item(hass, config, full_config=None):
"""Validate config item."""
raw_config = None
try:
with suppress(ValueError):
raw_config = dict(config)
except ValueError:
# Invalid config
pass
try:
config = await async_validate_config_item(hass, config, full_config)

View file

@ -1,4 +1,6 @@
"""Support for Bizkaibus, Biscay (Basque Country, Spain) Bus service."""
from contextlib import suppress
from bizkaibus.bizkaibus import BizkaibusData
import voluptuous as vol
@ -61,10 +63,8 @@ class BizkaibusSensor(SensorEntity):
def update(self):
"""Get the latest data from the webservice."""
self.data.update()
try:
with suppress(TypeError):
self._state = self.data.info[0][ATTR_DUE_IN]
except TypeError:
pass
class Bizkaibus:

View file

@ -1,4 +1,5 @@
"""Support for BME280 temperature, humidity and pressure sensor."""
from contextlib import suppress
from datetime import timedelta
from functools import partial
import logging
@ -110,13 +111,11 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
sensor_handler = await hass.async_add_executor_job(BME280Handler, sensor)
dev = []
try:
with suppress(KeyError):
for variable in config[CONF_MONITORED_CONDITIONS]:
dev.append(
BME280Sensor(sensor_handler, variable, SENSOR_TYPES[variable][1], name)
)
except KeyError:
pass
async_add_entities(dev, True)

View file

@ -1,5 +1,6 @@
"""Support for Broadlink devices."""
import asyncio
from contextlib import suppress
from functools import partial
import logging
@ -102,10 +103,8 @@ class BroadlinkDevice:
self.hass.data[DOMAIN].devices[config.entry_id] = self
self.reset_jobs.append(config.add_update_listener(self.async_update))
try:
with suppress(BroadlinkException, OSError):
self.fw_version = await self.hass.async_add_executor_job(api.get_fwversion)
except (BroadlinkException, OSError):
pass
# Forward entry setup to related domains.
tasks = (

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
from datetime import timedelta
import functools as ft
import json
@ -330,21 +331,14 @@ class CastDevice(MediaPlayerEntity):
tts_base_url = None
url_description = ""
if "tts" in self.hass.config.components:
try:
with suppress(KeyError): # base_url not configured
tts_base_url = self.hass.components.tts.get_base_url(self.hass)
except KeyError:
# base_url not configured, ignore
pass
try:
with suppress(NoURLAvailableError): # external_url not configured
external_url = get_url(self.hass, allow_internal=False)
except NoURLAvailableError:
# external_url not configured, ignore
pass
try:
with suppress(NoURLAvailableError): # internal_url not configured
internal_url = get_url(self.hass, allow_external=False)
except NoURLAvailableError:
# internal_url not configured, ignore
pass
if media_status.content_id:
if tts_base_url and media_status.content_id.startswith(tts_base_url):

View file

@ -1,5 +1,6 @@
"""Alexa configuration for Home Assistant Cloud."""
import asyncio
from contextlib import suppress
from datetime import timedelta
import logging
@ -322,7 +323,5 @@ class AlexaConfig(alexa_config.AbstractConfig):
if "old_entity_id" in event.data:
to_remove.append(event.data["old_entity_id"])
try:
with suppress(alexa_errors.NoTokenAvailable):
await self._sync_helper(to_update, to_remove)
except alexa_errors.NoTokenAvailable:
pass

View file

@ -6,6 +6,7 @@ This will return a request id that has to be used for future calls.
A callback has to be provided to `request_config` which will be called when
the user has submitted configuration information.
"""
from contextlib import suppress
import functools as ft
from homeassistant.const import (
@ -96,11 +97,8 @@ def request_config(hass, *args, **kwargs):
@async_callback
def async_notify_errors(hass, request_id, error):
"""Add errors to a config request."""
try:
with suppress(KeyError): # If request_id does not exist
hass.data[DATA_REQUESTS][request_id].async_notify_errors(request_id, error)
except KeyError:
# If request_id does not exist
pass
@bind_hass
@ -115,11 +113,8 @@ def notify_errors(hass, request_id, error):
@async_callback
def async_request_done(hass, request_id):
"""Mark a configuration request as done."""
try:
with suppress(KeyError): # If request_id does not exist
hass.data[DATA_REQUESTS].pop(request_id).async_request_done(request_id)
except KeyError:
# If request_id does not exist
pass
@bind_hass

View file

@ -1,5 +1,6 @@
"""Support for Denon AVR receivers using their HTTP interface."""
from contextlib import suppress
import logging
from homeassistant.components.media_player import MediaPlayerEntity
@ -372,11 +373,9 @@ class DenonDevice(MediaPlayerEntity):
volume_denon = float((volume * 100) - 80)
if volume_denon > 18:
volume_denon = float(18)
try:
with suppress(ValueError):
if self._receiver.set_volume(volume_denon):
self._volume = volume_denon
except ValueError:
pass
def mute_volume(self, mute):
"""Send mute command."""

View file

@ -1,4 +1,5 @@
"""Support for Adafruit DHT temperature and humidity sensor."""
from contextlib import suppress
from datetime import timedelta
import logging
@ -74,7 +75,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
dev = []
name = config[CONF_NAME]
try:
with suppress(KeyError):
for variable in config[CONF_MONITORED_CONDITIONS]:
dev.append(
DHTSensor(
@ -86,8 +87,6 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
humidity_offset,
)
)
except KeyError:
pass
add_entities(dev, True)

View file

@ -1,6 +1,7 @@
"""The dsmr component."""
import asyncio
from asyncio import CancelledError
from contextlib import suppress
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -36,10 +37,8 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
# Cancel the reconnect task
task.cancel()
try:
with suppress(CancelledError):
await task
except CancelledError:
pass
unload_ok = all(
await asyncio.gather(

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
from asyncio import CancelledError
from contextlib import suppress
from datetime import timedelta
from functools import partial
import logging
@ -342,10 +343,8 @@ class DSMREntity(SensorEntity):
if self._obis == obis_ref.ELECTRICITY_ACTIVE_TARIFF:
return self.translate_tariff(value, self._config[CONF_DSMR_VERSION])
try:
with suppress(TypeError):
value = round(float(value), self._config[CONF_PRECISION])
except TypeError:
pass
if value is not None:
return value

View file

@ -4,6 +4,7 @@ Support for Dublin RTPI information from data.dublinked.ie.
For more info on the API see :
https://data.gov.ie/dataset/real-time-passenger-information-rtpi-for-dublin-bus-bus-eireann-luas-and-irish-rail/resource/4b9f2c4f-6bf5-4958-a43a-f12dab04cf61
"""
from contextlib import suppress
from datetime import datetime, timedelta
import requests
@ -117,10 +118,8 @@ class DublinPublicTransportSensor(SensorEntity):
"""Get the latest data from opendata.ch and update the states."""
self.data.update()
self._times = self.data.info
try:
with suppress(TypeError):
self._state = self._times[0][ATTR_DUE_IN]
except TypeError:
pass
class PublicTransportData:

View file

@ -1,4 +1,5 @@
"""Support for local control of entities by emulating a Philips Hue bridge."""
from contextlib import suppress
import logging
from aiohttp import web
@ -341,8 +342,6 @@ class Config:
def _load_json(filename):
"""Load JSON, handling invalid syntax."""
try:
with suppress(HomeAssistantError):
return load_json(filename)
except HomeAssistantError:
pass
return {}

View file

@ -1,4 +1,6 @@
"""Support for Fibaro sensors."""
from contextlib import suppress
from homeassistant.components.sensor import DOMAIN, SensorEntity
from homeassistant.const import (
CONCENTRATION_PARTS_PER_MILLION,
@ -71,7 +73,7 @@ class FibaroSensor(FibaroDevice, SensorEntity):
self._unit = None
self._icon = None
self._device_class = None
try:
with suppress(KeyError, ValueError):
if not self._unit:
if self.fibaro_device.properties.unit == "lux":
self._unit = LIGHT_LUX
@ -81,8 +83,6 @@ class FibaroSensor(FibaroDevice, SensorEntity):
self._unit = TEMP_FAHRENHEIT
else:
self._unit = self.fibaro_device.properties.unit
except (KeyError, ValueError):
pass
@property
def state(self):
@ -106,7 +106,5 @@ class FibaroSensor(FibaroDevice, SensorEntity):
def update(self):
"""Update the state."""
try:
with suppress(KeyError, ValueError):
self.current_value = float(self.fibaro_device.properties.value)
except (KeyError, ValueError):
pass

View file

@ -1,4 +1,5 @@
"""Config flow to configure forked-daapd devices."""
from contextlib import suppress
import logging
from pyforked_daapd import ForkedDaapdAPI
@ -161,12 +162,10 @@ class ForkedDaapdFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if discovery_info.get("properties") and discovery_info["properties"].get(
"Machine Name"
):
try:
with suppress(ValueError):
version_num = int(
discovery_info["properties"].get("mtd-version", "0").split(".")[0]
)
except ValueError:
pass
if version_num < 27:
return self.async_abort(reason="not_forked_daapd")
await self.async_set_unique_id(discovery_info["properties"]["Machine Name"])

View file

@ -1,4 +1,5 @@
"""Base class for fritzbox_callmonitor entities."""
from contextlib import suppress
from datetime import timedelta
import logging
import re
@ -69,11 +70,7 @@ class FritzBoxPhonebook:
return UNKOWN_NAME
for prefix in self.prefixes:
try:
with suppress(KeyError):
return self.number_dict[prefix + number]
except KeyError:
pass
try:
with suppress(KeyError):
return self.number_dict[prefix + number.lstrip("0")]
except KeyError:
pass

View file

@ -1,4 +1,5 @@
"""Support for sending data to a Graphite installation."""
from contextlib import suppress
import logging
import queue
import socket
@ -111,10 +112,8 @@ class GraphiteFeeder(threading.Thread):
"""Report the attributes."""
now = time.time()
things = dict(new_state.attributes)
try:
with suppress(ValueError):
things["state"] = state.state_as_number(new_state)
except ValueError:
pass
lines = [
"%s.%s.%s %f %i"
% (self._prefix, entity_id, key.replace(" ", "_"), value, now)

View file

@ -1,5 +1,6 @@
"""The Hangouts Bot."""
import asyncio
from contextlib import suppress
import io
import logging
@ -103,12 +104,10 @@ class HangoutsBot:
self._conversation_intents[conv_id][intent_type] = data
try:
with suppress(ValueError):
self._conversation_list.on_event.remove_observer(
self._async_handle_conversation_event
)
except ValueError:
pass
self._conversation_list.on_event.add_observer(
self._async_handle_conversation_event
)

View file

@ -1,4 +1,5 @@
"""HTML5 Push Messaging notification service."""
from contextlib import suppress
from datetime import datetime, timedelta
from functools import partial
import json
@ -202,10 +203,8 @@ def get_service(hass, config, discovery_info=None):
def _load_config(filename):
"""Load configuration."""
try:
with suppress(HomeAssistantError):
return load_json(filename)
except HomeAssistantError:
pass
return {}
@ -325,10 +324,8 @@ class HTML5PushCallbackView(HomeAssistantView):
if target_check.get(ATTR_TARGET) in self.registrations:
possible_target = self.registrations[target_check[ATTR_TARGET]]
key = possible_target[ATTR_SUBSCRIPTION][ATTR_KEYS][ATTR_AUTH]
try:
with suppress(jwt.exceptions.DecodeError):
return jwt.decode(token, key, algorithms=["ES256", "HS256"])
except jwt.exceptions.DecodeError:
pass
return self.json_message(
"No target found in JWT", status_code=HTTP_UNAUTHORIZED

View file

@ -2,6 +2,7 @@
from __future__ import annotations
from collections import defaultdict
from contextlib import suppress
from datetime import datetime
from ipaddress import ip_address
import logging
@ -99,12 +100,10 @@ async def process_wrong_login(request):
remote_addr = ip_address(request.remote)
remote_host = request.remote
try:
with suppress(herror):
remote_host, _, _ = await hass.async_add_executor_job(
gethostbyaddr, request.remote
)
except herror:
pass
base_msg = f"Login attempt or request with invalid authentication from {remote_host} ({remote_addr})."

View file

@ -2,6 +2,7 @@
from __future__ import annotations
from collections import defaultdict
from contextlib import suppress
from datetime import timedelta
from functools import partial
import ipaddress
@ -161,10 +162,8 @@ class Router:
(KEY_DEVICE_BASIC_INFORMATION, "devicename"),
(KEY_DEVICE_INFORMATION, "DeviceName"),
):
try:
with suppress(KeyError, TypeError):
return cast(str, self.data[key][item])
except (KeyError, TypeError):
pass
return DEFAULT_DEVICE_NAME
@property

View file

@ -1,5 +1,6 @@
"""Support for hunter douglas shades."""
import asyncio
from contextlib import suppress
import logging
from aiopvapi.helpers.constants import ATTR_POSITION1, ATTR_POSITION_DATA
@ -65,12 +66,10 @@ async def async_setup_entry(hass, entry, async_add_entities):
# possible
shade = PvShade(raw_shade, pv_request)
name_before_refresh = shade.name
try:
with suppress(asyncio.TimeoutError):
async with async_timeout.timeout(1):
await shade.refresh()
except asyncio.TimeoutError:
# Forced refresh is not required for setup
pass
if ATTR_POSITION_DATA not in shade.raw_data:
_LOGGER.info(
"The %s shade was skipped because it is missing position data",

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
import logging
from typing import Any, Callable, cast
@ -159,7 +160,7 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
raise ConfigEntryNotReady
version = await hyperion_client.async_sysinfo_version()
if version is not None:
try:
with suppress(ValueError):
if AwesomeVersion(version) < AwesomeVersion(HYPERION_VERSION_WARN_CUTOFF):
_LOGGER.warning(
"Using a Hyperion server version < %s is not recommended -- "
@ -168,8 +169,6 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
HYPERION_VERSION_WARN_CUTOFF,
HYPERION_RELEASES_URL,
)
except ValueError:
pass
# Client needs authentication, but no token provided? => Reauth.
auth_resp = await hyperion_client.async_is_auth_required()

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
import logging
from typing import Any
from urllib.parse import urlparse
@ -257,10 +258,8 @@ class HyperionConfigFlow(ConfigFlow, domain=DOMAIN):
if not self._request_token_task.done():
self._request_token_task.cancel()
try:
with suppress(asyncio.CancelledError):
await self._request_token_task
except asyncio.CancelledError:
pass
self._request_token_task = None
async def _request_token_task_func(self, auth_id: str) -> None:

View file

@ -1,6 +1,7 @@
"""Support for sending data to an Influx database."""
from __future__ import annotations
from contextlib import suppress
from dataclasses import dataclass
import logging
import math
@ -304,11 +305,9 @@ def _generate_event_to_json(conf: dict) -> Callable[[dict], str]:
)
# Infinity and NaN are not valid floats in InfluxDB
try:
with suppress(KeyError, TypeError):
if not math.isfinite(json[INFLUX_CONF_FIELDS][key]):
del json[INFLUX_CONF_FIELDS][key]
except (KeyError, TypeError):
pass
json[INFLUX_CONF_TAGS].update(tags)
@ -382,10 +381,8 @@ def get_influx_connection(conf, test_write=False, test_read=False):
if test_write:
# Try to write b"" to influx. If we can connect and creds are valid
# Then invalid inputs is returned. Anything else is a broken config
try:
with suppress(ValueError):
write_v2(b"")
except ValueError:
pass
write_api = influx.write_api(write_options=ASYNCHRONOUS)
if test_read:
@ -530,7 +527,7 @@ class InfluxThread(threading.Thread):
dropped = 0
try:
with suppress(queue.Empty):
while len(json) < BATCH_BUFFER_SIZE and not self.shutdown:
timeout = None if count == 0 else self.batch_timeout()
item = self.queue.get(timeout=timeout)
@ -549,9 +546,6 @@ class InfluxThread(threading.Thread):
else:
dropped += 1
except queue.Empty:
pass
if dropped:
_LOGGER.warning(CATCHING_UP_MESSAGE, dropped)

View file

@ -1,5 +1,6 @@
"""Support for INSTEON Modems (PLM and Hub)."""
import asyncio
from contextlib import suppress
import logging
from pyinsteon import async_close, async_connect, devices
@ -37,10 +38,8 @@ async def async_get_device_config(hass, config_entry):
# Make a copy of addresses due to edge case where the list of devices could change during status update
# Cannot be done concurrently due to issues with the underlying protocol.
for address in list(devices):
try:
with suppress(AttributeError):
await devices[address].async_status()
except AttributeError:
pass
await devices.async_load(id_devices=1)
for addr in devices:

View file

@ -1,6 +1,7 @@
"""Config flow for izone."""
import asyncio
from contextlib import suppress
import logging
from async_timeout import timeout
@ -28,11 +29,9 @@ async def _async_has_devices(hass):
disco = await async_start_discovery_service(hass)
try:
with suppress(asyncio.TimeoutError):
async with timeout(TIMEOUT_DISCOVERY):
await controller_ready.wait()
except asyncio.TimeoutError:
pass
if not disco.pi_disco.controllers:
await async_stop_discovery_service(hass)

View file

@ -1,6 +1,7 @@
"""Receive signals from a keyboard and use it as a remote control."""
# pylint: disable=import-error
import asyncio
from contextlib import suppress
import logging
import os
@ -255,10 +256,8 @@ class KeyboardRemote:
async def async_stop_monitoring(self):
"""Stop event monitoring task and issue event."""
if self.monitor_task is not None:
try:
with suppress(OSError):
await self.hass.async_add_executor_job(self.dev.ungrab)
except OSError:
pass
# monitoring of the device form the event loop and closing of the
# device has to occur before cancelling the task to avoid
# triggering unhandled exceptions inside evdev coroutines

View file

@ -1,4 +1,5 @@
"""Support for media browsing."""
from contextlib import suppress
import logging
from homeassistant.components.media_player import BrowseError, BrowseMedia
@ -171,10 +172,8 @@ async def build_item_response(media_library, payload):
children = []
for item in media:
try:
with suppress(UnknownMediaType):
children.append(item_payload(item, media_library))
except UnknownMediaType:
pass
if search_type in (MEDIA_TYPE_TVSHOW, MEDIA_TYPE_MOVIE) and search_id == "":
children.sort(key=lambda x: x.title.replace("The ", "", 1), reverse=False)

View file

@ -1,4 +1,5 @@
"""Event parser and human readable log generator."""
from contextlib import suppress
from datetime import timedelta
from itertools import groupby
import json
@ -384,10 +385,8 @@ def humanify(hass, events, entity_attr_cache, context_lookup):
domain = event_data.get(ATTR_DOMAIN)
entity_id = event_data.get(ATTR_ENTITY_ID)
if domain is None and entity_id is not None:
try:
with suppress(IndexError):
domain = split_entity_id(str(entity_id))[0]
except IndexError:
pass
data = {
"when": event.time_fired_isoformat,

View file

@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio
import base64
import collections
from contextlib import suppress
from datetime import timedelta
import functools as ft
import hashlib
@ -935,19 +936,14 @@ class MediaPlayerEntity(Entity):
"""Retrieve an image."""
content, content_type = (None, None)
websession = async_get_clientsession(self.hass)
try:
with async_timeout.timeout(10):
with suppress(asyncio.TimeoutError), async_timeout.timeout(10):
response = await websession.get(url)
if response.status == HTTP_OK:
content = await response.read()
content_type = response.headers.get(CONTENT_TYPE)
if content_type:
content_type = content_type.split(";")[0]
except asyncio.TimeoutError:
pass
if content is None:
_LOGGER.warning("Error retrieving proxied image from %s", url)

View file

@ -1,4 +1,5 @@
"""Config flow for Minecraft Server integration."""
from contextlib import suppress
from functools import partial
import ipaddress
@ -40,10 +41,8 @@ class MinecraftServerConfigFlow(ConfigFlow, domain=DOMAIN):
host = address_right
else:
host = address_left
try:
with suppress(ValueError):
port = int(address_right)
except ValueError:
pass # 'port' is already set to default value.
# Remove '[' and ']' in case of an IPv6 address.
host = host.strip("[]")

View file

@ -1,5 +1,6 @@
"""Integrates Native Apps to Home Assistant."""
import asyncio
from contextlib import suppress
from homeassistant.components import cloud, notify as hass_notify
from homeassistant.components.webhook import (
@ -51,12 +52,10 @@ async def async_setup(hass: HomeAssistantType, config: ConfigType):
hass.http.register_view(RegistrationsView())
for deleted_id in hass.data[DOMAIN][DATA_DELETED_IDS]:
try:
with suppress(ValueError):
webhook_register(
hass, DOMAIN, "Deleted Webhook", deleted_id, handle_webhook
)
except ValueError:
pass
hass.async_create_task(
discovery.async_load_platform(hass, "notify", DOMAIN, {}, config)
@ -129,7 +128,5 @@ async def async_remove_entry(hass, entry):
await store.async_save(savable_state(hass))
if CONF_CLOUDHOOK_URL in entry.data:
try:
with suppress(cloud.CloudNotAvailable):
await cloud.async_delete_cloudhook(hass, entry.data[CONF_WEBHOOK_ID])
except cloud.CloudNotAvailable:
pass

View file

@ -1,6 +1,7 @@
"""Provides an HTTP API for mobile_app."""
from __future__ import annotations
from contextlib import suppress
import secrets
from aiohttp.web import Request, Response
@ -98,10 +99,8 @@ class RegistrationsView(HomeAssistantView):
)
remote_ui_url = None
try:
with suppress(hass.components.cloud.CloudNotAvailable):
remote_ui_url = hass.components.cloud.async_remote_ui_url()
except hass.components.cloud.CloudNotAvailable:
pass
return self.json(
{

View file

@ -1,5 +1,6 @@
"""Webhook handlers for mobile_app."""
import asyncio
from contextlib import suppress
from functools import wraps
import logging
import secrets
@ -551,10 +552,8 @@ async def webhook_get_config(hass, config_entry, data):
if CONF_CLOUDHOOK_URL in config_entry.data:
resp[CONF_CLOUDHOOK_URL] = config_entry.data[CONF_CLOUDHOOK_URL]
try:
with suppress(hass.components.cloud.CloudNotAvailable):
resp[CONF_REMOTE_UI_URL] = hass.components.cloud.async_remote_ui_url()
except hass.components.cloud.CloudNotAvailable:
pass
return webhook_response(resp, registration=config_entry.data)

View file

@ -1,5 +1,6 @@
"""The motion_blinds component."""
import asyncio
from contextlib import suppress
from datetime import timedelta
import logging
from socket import timeout
@ -64,19 +65,13 @@ async def async_setup_entry(
"""Call all updates using one async_add_executor_job."""
motion_gateway.Update()
for blind in motion_gateway.device_list.values():
try:
with suppress(timeout):
blind.Update()
except timeout:
# let the error be logged and handled by the motionblinds library
pass
async def async_update_data():
"""Fetch data from the gateway and blinds."""
try:
with suppress(timeout): # Let the error be handled by the motionblinds
await hass.async_add_executor_job(update_gateway)
except timeout:
# let the error be logged and handled by the motionblinds library
pass
coordinator = DataUpdateCoordinator(
hass,

View file

@ -1,4 +1,5 @@
"""Support to interact with a Music Player Daemon."""
from contextlib import suppress
from datetime import timedelta
import hashlib
import logging
@ -129,10 +130,8 @@ class MpdDevice(MediaPlayerEntity):
def _disconnect(self):
"""Disconnect from MPD."""
try:
with suppress(mpd.ConnectionError):
self._client.disconnect()
except mpd.ConnectionError:
pass
self._is_connected = False
self._status = None

View file

@ -1,4 +1,5 @@
"""Support for MQTT JSON lights."""
from contextlib import suppress
import json
import logging
@ -245,10 +246,8 @@ class MqttLightJson(MqttEntity, LightEntity, RestoreEntity):
_LOGGER.warning("Invalid color temp value received")
if self._supported_features and SUPPORT_EFFECT:
try:
with suppress(KeyError):
self._effect = values["effect"]
except KeyError:
pass
if self._supported_features and SUPPORT_WHITE_VALUE:
try:

View file

@ -1,4 +1,5 @@
"""Offer MQTT listening automation rules."""
from contextlib import suppress
import json
import logging
@ -79,10 +80,8 @@ async def async_attach_trigger(hass, config, action, automation_info):
"description": f"mqtt topic {mqttmsg.topic}",
}
try:
with suppress(ValueError):
data["payload_json"] = json.loads(mqttmsg.payload)
except ValueError:
pass
hass.async_run_hass_job(job, {"trigger": data})

View file

@ -1,6 +1,7 @@
"""Config flow for MySensors."""
from __future__ import annotations
from contextlib import suppress
import logging
import os
from typing import Any
@ -62,15 +63,14 @@ def _get_schema_common(user_input: dict[str, str]) -> dict:
def _validate_version(version: str) -> dict[str, str]:
"""Validate a version string from the user."""
version_okay = False
try:
with suppress(AwesomeVersionStrategyException):
version_okay = bool(
AwesomeVersion.ensure_strategy(
version,
[AwesomeVersionStrategy.SIMPLEVER, AwesomeVersionStrategy.SEMVER],
)
)
except AwesomeVersionStrategyException:
pass
if version_okay:
return {}
return {CONF_VERSION: "invalid_version"}

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
import datetime as dt
import os
@ -241,25 +242,19 @@ class ONVIFDevice:
async def async_get_capabilities(self):
"""Obtain information about the available services on the device."""
snapshot = False
try:
with suppress(ONVIFError, Fault, RequestError):
media_service = self.device.create_media_service()
media_capabilities = await media_service.GetServiceCapabilities()
snapshot = media_capabilities and media_capabilities.SnapshotUri
except (ONVIFError, Fault, RequestError):
pass
pullpoint = False
try:
with suppress(ONVIFError, Fault, RequestError):
pullpoint = await self.events.async_start()
except (ONVIFError, Fault, RequestError):
pass
ptz = False
try:
with suppress(ONVIFError, Fault, RequestError):
self.device.get_definition("ptz")
ptz = True
except (ONVIFError, Fault, RequestError):
pass
return Capabilities(snapshot, pullpoint, ptz)

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
import datetime as dt
from typing import Callable
@ -86,10 +87,8 @@ class EventManager:
# Initialize events
pullpoint = self.device.create_pullpoint_service()
try:
with suppress(*SUBSCRIPTION_ERRORS):
await pullpoint.SetSynchronizationPoint()
except SUBSCRIPTION_ERRORS:
pass
response = await pullpoint.PullMessages(
{"MessageLimit": 100, "Timeout": dt.timedelta(seconds=5)}
)
@ -119,10 +118,9 @@ class EventManager:
return
if self._subscription:
try:
# Suppressed. The subscription may no longer exist.
with suppress(*SUBSCRIPTION_ERRORS):
await self._subscription.Unsubscribe()
except SUBSCRIPTION_ERRORS:
pass # Ignored. The subscription may no longer exist.
self._subscription = None
try:

View file

@ -1,5 +1,6 @@
"""The ozw integration."""
import asyncio
from contextlib import suppress
import json
import logging
@ -280,10 +281,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
Do not unsubscribe the manager topic.
"""
mqtt_client_task.cancel()
try:
with suppress(asyncio.CancelledError):
await mqtt_client_task
except asyncio.CancelledError:
pass
ozw_data[DATA_UNSUBSCRIBE].append(
hass.bus.async_listen_once(

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
from datetime import timedelta
from functools import partial
import logging
@ -242,10 +243,8 @@ class PingDataSubProcess(PingData):
self._count + PING_TIMEOUT,
)
if pinger:
try:
with suppress(TypeError):
await pinger.kill()
except TypeError:
pass
del pinger
return False

View file

@ -1,5 +1,6 @@
"""Support for monitoring plants."""
from collections import deque
from contextlib import suppress
from datetime import datetime, timedelta
import logging
@ -324,12 +325,10 @@ class Plant(Entity):
for state in states:
# filter out all None, NaN and "unknown" states
# only keep real values
try:
with suppress(ValueError):
self._brightness_history.add_measurement(
int(state.state), state.last_updated
)
except ValueError:
pass
_LOGGER.debug("Initializing from database completed")
@property

View file

@ -1,5 +1,6 @@
"""Support for PlayStation 4 consoles."""
import asyncio
from contextlib import suppress
import logging
from pyps4_2ndscreen.errors import NotReady, PSDataIncomplete
@ -142,10 +143,8 @@ class PS4Device(MediaPlayerEntity):
and not self._ps4.is_standby
and self._ps4.is_available
):
try:
with suppress(NotReady):
await self._ps4.async_connect()
except NotReady:
pass
# Try to ensure correct status is set on startup for device info.
if self._ps4.ddp_protocol is None:

View file

@ -1,5 +1,6 @@
"""Integration with the Rachio Iro sprinkler system controller."""
from abc import abstractmethod
from contextlib import suppress
from datetime import timedelta
import logging
@ -525,7 +526,7 @@ class RachioSchedule(RachioSwitch):
def _async_handle_update(self, *args, **kwargs) -> None:
"""Handle incoming webhook schedule data."""
# Schedule ID not passed when running individual zones, so we catch that error
try:
with suppress(KeyError):
if args[0][KEY_SCHEDULE_ID] == self._schedule_id:
if args[0][KEY_SUBTYPE] in [SUBTYPE_SCHEDULE_STARTED]:
self._state = True
@ -534,8 +535,6 @@ class RachioSchedule(RachioSwitch):
SUBTYPE_SCHEDULE_COMPLETED,
]:
self._state = False
except KeyError:
pass
self.async_write_ha_state()

View file

@ -4,6 +4,7 @@ Support for Rejseplanen information from rejseplanen.dk.
For more info on the API see:
https://help.rejseplanen.dk/hc/en-us/articles/214174465-Rejseplanen-s-API
"""
from contextlib import suppress
from datetime import datetime, timedelta
import logging
from operator import itemgetter
@ -147,10 +148,8 @@ class RejseplanenTransportSensor(SensorEntity):
if not self._times:
self._state = None
else:
try:
with suppress(TypeError):
self._state = self._times[0][ATTR_DUE_IN]
except TypeError:
pass
class PublicTransportData:

View file

@ -1,6 +1,7 @@
"""Shark IQ Integration."""
import asyncio
from contextlib import suppress
import async_timeout
from sharkiqpy import (
@ -59,7 +60,7 @@ async def async_setup_entry(hass, config_entry):
raise exceptions.ConfigEntryNotReady from exc
shark_vacs = await ayla_api.async_get_devices(False)
device_names = ", ".join([d.name for d in shark_vacs])
device_names = ", ".join(d.name for d in shark_vacs)
_LOGGER.debug("Found %d Shark IQ device(s): %s", len(shark_vacs), device_names)
coordinator = SharkIqUpdateCoordinator(hass, config_entry, ayla_api, shark_vacs)
@ -81,11 +82,10 @@ async def async_setup_entry(hass, config_entry):
async def async_disconnect_or_timeout(coordinator: SharkIqUpdateCoordinator):
"""Disconnect to vacuum."""
_LOGGER.debug("Disconnecting from Ayla Api")
with async_timeout.timeout(5):
try:
with async_timeout.timeout(5), suppress(
SharkIqAuthError, SharkIqAuthExpiringError, SharkIqNotAuthedError
):
await coordinator.ayla_api.async_sign_out()
except (SharkIqAuthError, SharkIqAuthExpiringError, SharkIqNotAuthedError):
pass
async def async_update_options(hass, config_entry):
@ -105,10 +105,8 @@ async def async_unload_entry(hass, config_entry):
)
if unload_ok:
domain_data = hass.data[DOMAIN][config_entry.entry_id]
try:
with suppress(SharkIqAuthError):
await async_disconnect_or_timeout(coordinator=domain_data)
except SharkIqAuthError:
pass
hass.data[DOMAIN].pop(config_entry.entry_id)
return unload_ok

View file

@ -1,5 +1,6 @@
"""Expose regular shell commands as services."""
import asyncio
from contextlib import suppress
import logging
import shlex
@ -87,10 +88,8 @@ async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool:
"Timed out running command: `%s`, after: %ss", cmd, COMMAND_TIMEOUT
)
if process:
try:
with suppress(TypeError):
await process.kill()
except TypeError:
pass
del process
return

View file

@ -1,4 +1,5 @@
"""Support for SolarEdge-local Monitoring API."""
from contextlib import suppress
from copy import deepcopy
from datetime import timedelta
import logging
@ -350,19 +351,15 @@ class SolarEdgeData:
self.info["optimizers"] = status.optimizersStatus.total
self.info["invertertemperature"] = INVERTER_MODES[status.status]
try:
with suppress(IndexError):
if status.metersList[1]:
self.data["currentPowerimport"] = status.metersList[1].currentPower
self.data["totalEnergyimport"] = status.metersList[1].totalEnergy
except IndexError:
pass
try:
with suppress(IndexError):
if status.metersList[0]:
self.data["currentPowerexport"] = status.metersList[0].currentPower
self.data["totalEnergyexport"] = status.metersList[0].totalEnergy
except IndexError:
pass
if maintenance.system.name:
self.data["optimizertemperature"] = round(statistics.mean(temperature), 2)

View file

@ -1,4 +1,5 @@
"""Support for media browsing."""
from contextlib import suppress
import logging
import urllib.parse
@ -75,10 +76,8 @@ def build_item_response(media_library, payload, get_thumbnail_url=None):
children = []
for item in media:
try:
with suppress(UnknownMediaType):
children.append(item_payload(item, get_thumbnail_url))
except UnknownMediaType:
pass
return BrowseMedia(
title=title,
@ -136,10 +135,8 @@ def library_payload(media_library, get_thumbnail_url=None):
children = []
for item in media_library.browse():
try:
with suppress(UnknownMediaType):
children.append(item_payload(item, get_thumbnail_url))
except UnknownMediaType:
pass
return BrowseMedia(
title="Music Library",

View file

@ -1,5 +1,6 @@
"""Support to interface with Sonos players."""
import asyncio
from contextlib import suppress
import datetime
import functools as ft
import logging
@ -790,7 +791,7 @@ class SonosEntity(MediaPlayerEntity):
coordinator_uid = self.unique_id
slave_uids = []
try:
with suppress(SoCoException):
if self.soco.group and self.soco.group.coordinator:
coordinator_uid = self.soco.group.coordinator.uid
slave_uids = [
@ -798,8 +799,6 @@ class SonosEntity(MediaPlayerEntity):
for p in self.soco.group.members
if p.uid != coordinator_uid
]
except SoCoException:
pass
return [coordinator_uid] + slave_uids

View file

@ -1,4 +1,6 @@
"""Support for the SpaceAPI."""
from contextlib import suppress
import voluptuous as vol
from homeassistant.components.http import HomeAssistantView
@ -287,13 +289,11 @@ class APISpaceApiView(HomeAssistantView):
else:
state = {ATTR_OPEN: "null", ATTR_LASTCHANGE: 0}
try:
with suppress(KeyError):
state[ATTR_ICON] = {
ATTR_OPEN: spaceapi["state"][CONF_ICON_OPEN],
ATTR_CLOSE: spaceapi["state"][CONF_ICON_CLOSED],
}
except KeyError:
pass
data = {
ATTR_API: SPACEAPI_VERSION,
@ -306,40 +306,26 @@ class APISpaceApiView(HomeAssistantView):
ATTR_URL: spaceapi[CONF_URL],
}
try:
with suppress(KeyError):
data[ATTR_CAM] = spaceapi[CONF_CAM]
except KeyError:
pass
try:
with suppress(KeyError):
data[ATTR_SPACEFED] = spaceapi[CONF_SPACEFED]
except KeyError:
pass
try:
with suppress(KeyError):
data[ATTR_STREAM] = spaceapi[CONF_STREAM]
except KeyError:
pass
try:
with suppress(KeyError):
data[ATTR_FEEDS] = spaceapi[CONF_FEEDS]
except KeyError:
pass
try:
with suppress(KeyError):
data[ATTR_CACHE] = spaceapi[CONF_CACHE]
except KeyError:
pass
try:
with suppress(KeyError):
data[ATTR_PROJECTS] = spaceapi[CONF_PROJECTS]
except KeyError:
pass
try:
with suppress(KeyError):
data[ATTR_RADIO_SHOW] = spaceapi[CONF_RADIO_SHOW]
except KeyError:
pass
if is_sensors is not None:
sensors = {}

View file

@ -1,4 +1,5 @@
"""Support for Swisscom routers (Internet-Box)."""
from contextlib import suppress
import logging
from aiohttp.hdrs import CONTENT_TYPE
@ -97,13 +98,11 @@ class SwisscomDeviceScanner(DeviceScanner):
return devices
for device in request.json()["status"]:
try:
with suppress(KeyError, requests.exceptions.RequestException):
devices[device["Key"]] = {
"ip": device["IPAddress"],
"mac": device["PhysAddress"],
"host": device["Name"],
"status": device["Active"],
}
except (KeyError, requests.exceptions.RequestException):
pass
return devices

View file

@ -1,4 +1,5 @@
"""Support gathering ted5000 information."""
from contextlib import suppress
from datetime import timedelta
import logging
@ -73,10 +74,8 @@ class Ted5000Sensor(SensorEntity):
@property
def state(self):
"""Return the state of the resources."""
try:
with suppress(KeyError):
return self._gateway.data[self._mtu][self._unit]
except KeyError:
pass
def update(self):
"""Get the latest data from REST API."""

View file

@ -1,5 +1,6 @@
"""Support for TPLink HS100/HS110/HS200 smart switch."""
import asyncio
from contextlib import suppress
import logging
import time
@ -151,13 +152,10 @@ class SmartPlugSwitch(SwitchEntity):
)
emeter_statics = self.smartplug.get_emeter_daily()
try:
with suppress(KeyError): # Device returned no daily history
self._emeter_params[ATTR_TODAY_ENERGY_KWH] = "{:.3f}".format(
emeter_statics[int(time.strftime("%e"))]
)
except KeyError:
# Device returned no daily history
pass
return True
except (SmartDeviceException, OSError) as ex:
if update_attempt == 0:

View file

@ -1,6 +1,8 @@
"""Support for monitoring the Transmission BitTorrent client API."""
from __future__ import annotations
from contextlib import suppress
from transmissionrpc.torrent import Torrent
from homeassistant.components.sensor import SensorEntity
@ -187,8 +189,6 @@ def _torrents_info(torrents, order, limit, statuses=None):
"status": torrent.status,
"id": torrent.id,
}
try:
with suppress(ValueError):
info["eta"] = str(torrent.eta)
except ValueError:
pass
return infos

View file

@ -1,5 +1,6 @@
"""Config flow for UPB PIM integration."""
import asyncio
from contextlib import suppress
import logging
from urllib.parse import urlparse
@ -43,11 +44,8 @@ async def _validate_input(data):
upb.connect(_connected_callback)
try:
with async_timeout.timeout(VALIDATE_TIMEOUT):
with suppress(asyncio.TimeoutError), async_timeout.timeout(VALIDATE_TIMEOUT):
await connected_event.wait()
except asyncio.TimeoutError:
pass
upb.disconnect()

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
import os
from typing import Any
@ -164,10 +165,8 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return False
cookie_file = hass.config.path(STORAGE_DIR, f"verisure_{entry.entry_id}")
try:
with suppress(FileNotFoundError):
await hass.async_add_executor_job(os.unlink, cookie_file)
except FileNotFoundError:
pass
del hass.data[DOMAIN][entry.entry_id]

View file

@ -1,5 +1,6 @@
"""Support for LG webOS Smart TV."""
import asyncio
from contextlib import suppress
import json
import logging
import os
@ -150,9 +151,7 @@ async def async_setup_tv(hass, config, conf):
async def async_connect(client):
"""Attempt a connection, but fail gracefully if tv is off for example."""
try:
await client.connect()
except (
with suppress(
OSError,
ConnectionClosed,
ConnectionRefusedError,
@ -161,7 +160,7 @@ async def async_connect(client):
PyLGTVPairException,
PyLGTVCmdException,
):
pass
await client.connect()
async def async_setup_tv_finalize(hass, config, conf, client):

View file

@ -1,5 +1,6 @@
"""Support for interface with an LG webOS Smart TV."""
import asyncio
from contextlib import suppress
from datetime import timedelta
from functools import wraps
import logging
@ -214,9 +215,7 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
async def async_update(self):
"""Connect."""
if not self._client.is_connected():
try:
await self._client.connect()
except (
with suppress(
OSError,
ConnectionClosed,
ConnectionRefusedError,
@ -225,7 +224,7 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
PyLGTVPairException,
PyLGTVCmdException,
):
pass
await self._client.connect()
@property
def unique_id(self):

View file

@ -1,4 +1,5 @@
"""Support for Wink sensors."""
from contextlib import suppress
import logging
import pywink
@ -87,9 +88,9 @@ class WinkSensorEntity(WinkDevice, SensorEntity):
def extra_state_attributes(self):
"""Return the state attributes."""
super_attrs = super().extra_state_attributes
try:
super_attrs["egg_times"] = self.wink.eggs()
except AttributeError:
# Ignore error, this sensor isn't an eggminder
pass
with suppress(AttributeError):
super_attrs["egg_times"] = self.wink.eggs()
return super_attrs

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta
import logging
@ -248,12 +249,10 @@ class XboxUpdateCoordinator(DataUpdateCoordinator):
def _build_presence_data(person: Person) -> PresenceData:
"""Build presence data from a person."""
active_app: PresenceDetail | None = None
try:
with suppress(StopIteration):
active_app = next(
presence for presence in person.presence_details if presence.is_primary
)
except StopIteration:
pass
return PresenceData(
xuid=person.xuid,

View file

@ -1,6 +1,7 @@
"""Xbox Media Source Implementation."""
from __future__ import annotations
from contextlib import suppress
from dataclasses import dataclass
from pydantic.error_wrappers import ValidationError # pylint: disable=no-name-in-module
@ -138,7 +139,7 @@ class XboxSource(MediaSource):
owner, kind = category.split("#", 1)
items: list[XboxMediaItem] = []
try:
with suppress(ValidationError): # Unexpected API response
if kind == "gameclips":
if owner == "my":
response: GameclipsResponse = (
@ -189,9 +190,6 @@ class XboxSource(MediaSource):
)
for item in response.screenshots
]
except ValidationError:
# Unexpected API response
pass
return BrowseMediaSource(
domain=DOMAIN,

View file

@ -1,4 +1,5 @@
"""Support for Zabbix."""
from contextlib import suppress
import json
import logging
import math
@ -202,7 +203,7 @@ class ZabbixThread(threading.Thread):
dropped = 0
try:
with suppress(queue.Empty):
while len(metrics) < BATCH_BUFFER_SIZE and not self.shutdown:
timeout = None if count == 0 else BATCH_TIMEOUT
item = self.queue.get(timeout=timeout)
@ -223,9 +224,6 @@ class ZabbixThread(threading.Thread):
else:
dropped += 1
except queue.Empty:
pass
if dropped:
_LOGGER.warning("Catching up, dropped %d old events", dropped)

View file

@ -1,6 +1,7 @@
"""Support for exposing Home Assistant via Zeroconf."""
from __future__ import annotations
from contextlib import suppress
import fnmatch
from functools import partial
import ipaddress
@ -187,15 +188,11 @@ def _register_hass_zc_service(hass, zeroconf, uuid):
}
# Get instance URL's
try:
with suppress(NoURLAvailableError):
params["external_url"] = get_url(hass, allow_internal=False)
except NoURLAvailableError:
pass
try:
with suppress(NoURLAvailableError):
params["internal_url"] = get_url(hass, allow_external=False)
except NoURLAvailableError:
pass
# Set old base URL based on external or internal
params["base_url"] = params["external_url"] or params["internal_url"]
@ -380,11 +377,9 @@ def info_from_service(service):
properties["_raw"][key] = value
try:
with suppress(UnicodeDecodeError):
if isinstance(value, bytes):
properties[key] = value.decode("utf-8")
except UnicodeDecodeError:
pass
if not service.addresses:
return None

View file

@ -1,5 +1,6 @@
"""Zeroconf usage utility to warn about multiple instances."""
from contextlib import suppress
import logging
import zeroconf
@ -36,10 +37,8 @@ def _report(what: str) -> None:
"""
integration_frame = None
try:
with suppress(MissingIntegrationFrame):
integration_frame = get_integration_frame(exclude_integrations={"zeroconf"})
except MissingIntegrationFrame:
pass
if not integration_frame:
_LOGGER.warning(

View file

@ -1,6 +1,7 @@
"""Lighting channels module for Zigbee Home Automation."""
from __future__ import annotations
from contextlib import suppress
from typing import Coroutine
import zigpy.zcl.clusters.lighting as lighting
@ -39,10 +40,8 @@ class ColorChannel(ZigbeeChannel):
@property
def color_capabilities(self) -> int:
"""Return color capabilities of the light."""
try:
with suppress(KeyError):
return self.cluster["color_capabilities"]
except KeyError:
pass
if self.cluster.get("color_temperature") is not None:
return self.CAPABILITIES_COLOR_XY | self.CAPABILITIES_COLOR_TEMP
return self.CAPABILITIES_COLOR_XY

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
from ssl import SSLContext
import sys
from typing import Any, Awaitable, cast
@ -37,10 +38,9 @@ def async_get_clientsession(
This method must be run in the event loop.
"""
key = DATA_CLIENTSESSION_NOTVERIFY
if verify_ssl:
key = DATA_CLIENTSESSION
else:
key = DATA_CLIENTSESSION_NOTVERIFY
if key not in hass.data:
hass.data[key] = async_create_clientsession(hass, verify_ssl)
@ -130,7 +130,8 @@ async def async_aiohttp_proxy_stream(
response.content_type = content_type
await response.prepare(request)
try:
# Suppressing something went wrong fetching data, closed connection
with suppress(asyncio.TimeoutError, aiohttp.ClientError):
while hass.is_running:
with async_timeout.timeout(timeout):
data = await stream.read(buffer_size)
@ -139,10 +140,6 @@ async def async_aiohttp_proxy_stream(
break
await response.write(data)
except (asyncio.TimeoutError, aiohttp.ClientError):
# Something went wrong fetching data, closed connection
pass
return response

View file

@ -1,6 +1,7 @@
"""Network helpers."""
from __future__ import annotations
from contextlib import suppress
from ipaddress import ip_address
from typing import cast
@ -56,7 +57,7 @@ def get_url(
for url_type in order:
if allow_internal and url_type == TYPE_URL_INTERNAL:
try:
with suppress(NoURLAvailableError):
return _get_internal_url(
hass,
allow_ip=allow_ip,
@ -64,11 +65,9 @@ def get_url(
require_ssl=require_ssl,
require_standard_port=require_standard_port,
)
except NoURLAvailableError:
pass
if allow_external and url_type == TYPE_URL_EXTERNAL:
try:
with suppress(NoURLAvailableError):
return _get_external_url(
hass,
allow_cloud=allow_cloud,
@ -78,8 +77,6 @@ def get_url(
require_ssl=require_ssl,
require_standard_port=require_standard_port,
)
except NoURLAvailableError:
pass
# For current request, we accept loopback interfaces (e.g., 127.0.0.1),
# the Supervisor hostname and localhost transparently
@ -177,10 +174,8 @@ def _get_external_url(
) -> str:
"""Get external URL of this instance."""
if prefer_cloud and allow_cloud:
try:
with suppress(NoURLAvailableError):
return _get_cloud_url(hass)
except NoURLAvailableError:
pass
if hass.config.external_url:
external_url = yarl.URL(hass.config.external_url)
@ -201,10 +196,8 @@ def _get_external_url(
return normalize_url(str(external_url))
if allow_cloud:
try:
with suppress(NoURLAvailableError):
return _get_cloud_url(hass, require_current_request=require_current_request)
except NoURLAvailableError:
pass
raise NoURLAvailableError

View file

@ -2,7 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, suppress
from datetime import datetime, timedelta
from functools import partial
import itertools
@ -492,10 +492,8 @@ class _ScriptRun:
async def async_cancel_long_task() -> None:
# Stop long task and wait for it to finish.
long_task.cancel()
try:
with suppress(Exception):
await long_task
except Exception: # pylint: disable=broad-except
pass
# Wait for long task while monitoring for a stop request.
stop_task = self._hass.async_create_task(self._stop.wait())

View file

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
from json import JSONEncoder
import logging
import os
@ -243,7 +244,5 @@ class Store:
self._async_cleanup_delay_listener()
self._async_cleanup_final_write_listener()
try:
with suppress(FileNotFoundError):
await self.hass.async_add_executor_job(os.unlink, self.path)
except FileNotFoundError:
pass

View file

@ -5,6 +5,7 @@ from ast import literal_eval
import asyncio
import base64
import collections.abc
from contextlib import suppress
from datetime import datetime, timedelta
from functools import partial, wraps
import json
@ -513,10 +514,8 @@ class Template:
variables = dict(variables or {})
variables["value"] = value
try:
with suppress(ValueError, TypeError):
variables["value_json"] = json.loads(value)
except (ValueError, TypeError):
pass
try:
return self._compiled.render(variables).strip()

View file

@ -7,6 +7,7 @@ documentation as possible to keep it understandable.
from __future__ import annotations
import asyncio
from contextlib import suppress
import functools as ft
import importlib
import json
@ -587,10 +588,8 @@ def _load_file(
Only returns it if also found to be valid.
Async friendly.
"""
try:
with suppress(KeyError):
return hass.data[DATA_COMPONENTS][comp_or_platform] # type: ignore
except KeyError:
pass
cache = hass.data.get(DATA_COMPONENTS)
if cache is None:

View file

@ -1,6 +1,7 @@
"""Helper methods to handle the time in Home Assistant."""
from __future__ import annotations
from contextlib import suppress
import datetime as dt
import re
from typing import Any, cast
@ -127,10 +128,9 @@ def parse_datetime(dt_str: str) -> dt.datetime | None:
Raises ValueError if the input is well formatted but not a valid datetime.
Returns None if the input isn't well formatted.
"""
try:
with suppress(ValueError, IndexError):
return ciso8601.parse_datetime(dt_str)
except (ValueError, IndexError):
pass
match = DATETIME_RE.match(dt_str)
if not match:
return None

View file

@ -2,6 +2,7 @@
from __future__ import annotations
from collections import OrderedDict
from contextlib import suppress
import logging
import os
from os import O_CREAT, O_TRUNC, O_WRONLY, stat_result
@ -128,10 +129,8 @@ def save_yaml(fname: str, data: JSON_TYPE) -> None:
yaml.dump(data, temp_file)
os.replace(tmp_fname, fname)
if hasattr(os, "chown") and file_stat.st_ctime > -1:
try:
with suppress(OSError):
os.chown(fname, file_stat.st_uid, file_stat.st_gid)
except OSError:
pass
except YAMLError as exc:
_LOGGER.error(str(exc))
raise HomeAssistantError(exc) from exc

View file

@ -1,5 +1,6 @@
"""The tests for generic camera component."""
import asyncio
from contextlib import suppress
from aiohttp.client_exceptions import ClientResponseError
@ -214,10 +215,8 @@ async def test_retries_after_error(aioclient_mock, hass, hass_client):
aioclient_mock.get(radar_map_url(), text=None, status=HTTP_INTERNAL_SERVER_ERROR)
# A 404 should not return data and throw:
try:
with suppress(ClientResponseError):
await client.get("/api/camera_proxy/camera.config_test")
except ClientResponseError:
pass
assert aioclient_mock.call_count == 1

View file

@ -1,4 +1,5 @@
"""The tests for the Demo component."""
from contextlib import suppress
import json
import os
@ -20,10 +21,8 @@ def mock_history(hass):
def demo_cleanup(hass):
"""Clean up device tracker demo file."""
yield
try:
with suppress(FileNotFoundError):
os.remove(hass.config.path(YAML_DEVICES))
except FileNotFoundError:
pass
async def test_setting_up_demo(hass):

View file

@ -1,5 +1,6 @@
"""Test Home Assistant timeout handler."""
import asyncio
from contextlib import suppress
import time
import pytest
@ -232,11 +233,9 @@ async def test_mix_zone_timeout():
timeout = TimeoutManager()
async with timeout.async_timeout(0.1):
try:
with suppress(asyncio.TimeoutError):
async with timeout.async_timeout(0.2, "test"):
await asyncio.sleep(0.4)
except asyncio.TimeoutError:
pass
async def test_mix_zone_timeout_trigger_global():
@ -245,11 +244,9 @@ async def test_mix_zone_timeout_trigger_global():
with pytest.raises(asyncio.TimeoutError):
async with timeout.async_timeout(0.1):
try:
with suppress(asyncio.TimeoutError):
async with timeout.async_timeout(0.1, "test"):
await asyncio.sleep(0.3)
except asyncio.TimeoutError:
pass
await asyncio.sleep(0.3)
@ -259,11 +256,9 @@ async def test_mix_zone_timeout_trigger_global_cool_down():
timeout = TimeoutManager()
async with timeout.async_timeout(0.1, cool_down=0.3):
try:
with suppress(asyncio.TimeoutError):
async with timeout.async_timeout(0.1, "test"):
await asyncio.sleep(0.3)
except asyncio.TimeoutError:
pass
await asyncio.sleep(0.2)
@ -301,11 +296,9 @@ async def test_simple_zone_timeout_freeze_without_timeout_exeption():
with pytest.raises(asyncio.TimeoutError):
async with timeout.async_timeout(0.1):
try:
with suppress(RuntimeError):
async with timeout.async_freeze("test"):
raise RuntimeError()
except RuntimeError:
pass
await asyncio.sleep(0.4)
@ -316,10 +309,8 @@ async def test_simple_zone_timeout_zone_with_timeout_exeption():
with pytest.raises(asyncio.TimeoutError):
async with timeout.async_timeout(0.1):
try:
with suppress(RuntimeError):
async with timeout.async_timeout(0.3, "test"):
raise RuntimeError()
except RuntimeError:
pass
await asyncio.sleep(0.3)