Use f-strings in integrations starting with "A" (#32110)

* Use f-strings in integrations starting with A

* Use f-strings in tests for integrations starting with A

* Fix pylint by renaming variable

* Fix nested for loop in f-string for aprs device_tracker

* Break long lines into multiple short lines

* Break long lines into multiple short lines v2
This commit is contained in:
springstan 2020-02-23 22:38:05 +01:00 committed by GitHub
parent a85808e325
commit 524a1a7587
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
48 changed files with 162 additions and 229 deletions

View file

@ -24,13 +24,7 @@ from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.entity import Entity
from .const import (
ATTRIBUTION,
DEFAULT_CACHEDB,
DOMAIN,
SIGNAL_CAPTURE_IMAGE,
SIGNAL_TRIGGER_QUICK_ACTION,
)
from .const import ATTRIBUTION, DEFAULT_CACHEDB, DOMAIN
_LOGGER = logging.getLogger(__name__)
@ -193,7 +187,7 @@ def setup_hass_services(hass):
]
for entity_id in target_entities:
signal = SIGNAL_CAPTURE_IMAGE.format(entity_id)
signal = f"abode_camera_capture_{entity_id}"
dispatcher_send(hass, signal)
def trigger_quick_action(call):
@ -207,7 +201,7 @@ def setup_hass_services(hass):
]
for entity_id in target_entities:
signal = SIGNAL_TRIGGER_QUICK_ACTION.format(entity_id)
signal = f"abode_trigger_quick_action_{entity_id}"
dispatcher_send(hass, signal)
hass.services.register(

View file

@ -8,7 +8,7 @@ from homeassistant.components.binary_sensor import BinarySensorDevice
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from . import AbodeAutomation, AbodeDevice
from .const import DOMAIN, SIGNAL_TRIGGER_QUICK_ACTION
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
@ -60,7 +60,7 @@ class AbodeQuickActionBinarySensor(AbodeAutomation, BinarySensorDevice):
async def async_added_to_hass(self):
"""Subscribe Abode events."""
await super().async_added_to_hass()
signal = SIGNAL_TRIGGER_QUICK_ACTION.format(self.entity_id)
signal = f"abode_trigger_quick_action_{self.entity_id}"
async_dispatcher_connect(self.hass, signal, self.trigger)
def trigger(self):

View file

@ -11,7 +11,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.util import Throttle
from . import AbodeDevice
from .const import DOMAIN, SIGNAL_CAPTURE_IMAGE
from .const import DOMAIN
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=90)
@ -50,7 +50,7 @@ class AbodeCamera(AbodeDevice, Camera):
self._capture_callback,
)
signal = SIGNAL_CAPTURE_IMAGE.format(self.entity_id)
signal = f"abode_camera_capture_{self.entity_id}"
async_dispatcher_connect(self.hass, signal, self.capture)
def capture(self):

View file

@ -3,6 +3,3 @@ DOMAIN = "abode"
ATTRIBUTION = "Data provided by goabode.com"
DEFAULT_CACHEDB = "abodepy_cache.pickle"
SIGNAL_CAPTURE_IMAGE = "abode_camera_capture_{}"
SIGNAL_TRIGGER_QUICK_ACTION = "abode_trigger_quick_action_{}"

View file

@ -44,9 +44,7 @@ class AbodeSensor(AbodeDevice):
"""Initialize a sensor for an Abode device."""
super().__init__(data, device)
self._sensor_type = sensor_type
self._name = "{0} {1}".format(
self._device.name, SENSOR_TYPES[self._sensor_type][0]
)
self._name = f"{self._device.name} {SENSOR_TYPES[self._sensor_type][0]}"
self._device_class = SENSOR_TYPES[self._sensor_type][1]
@property

View file

@ -185,7 +185,7 @@ class AirVisualSensor(Entity):
@property
def name(self):
"""Return the name."""
return "{0} {1}".format(SENSOR_LOCALES[self._locale], self._name)
return f"{SENSOR_LOCALES[self._locale]} {self._name}"
@property
def state(self):

View file

@ -53,9 +53,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
except (TypeError, KeyError, NameError, ValueError) as ex:
_LOGGER.error("%s", ex)
hass.components.persistent_notification.create(
"Error: {}<br />"
"You will need to restart hass after fixing."
"".format(ex),
"Error: {ex}<br />You will need to restart hass after fixing.",
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID,
)

View file

@ -31,7 +31,6 @@ from homeassistant.util.dt import now
_LOGGER = logging.getLogger(__name__)
DOMAIN = "alert"
ENTITY_ID_FORMAT = DOMAIN + ".{}"
CONF_CAN_ACK = "can_acknowledge"
CONF_NOTIFIERS = "notifiers"
@ -200,7 +199,7 @@ class Alert(ToggleEntity):
self._ack = False
self._cancel = None
self._send_done_message = False
self.entity_id = ENTITY_ID_FORMAT.format(entity_id)
self.entity_id = f"{DOMAIN}.{entity_id}"
event.async_track_state_change(
hass, watched_entity_id, self.watched_entity_change

View file

@ -478,8 +478,8 @@ async def async_api_select_input(hass, config, directive, context):
media_input = source
break
else:
msg = "failed to map input {} to a media source on {}".format(
media_input, entity.entity_id
msg = (
f"failed to map input {media_input} to a media source on {entity.entity_id}"
)
raise AlexaInvalidValueError(msg)
@ -1225,7 +1225,7 @@ async def async_api_adjust_range(hass, config, directive, context):
service = SERVICE_SET_COVER_POSITION
current = entity.attributes.get(cover.ATTR_POSITION)
if not current:
msg = "Unable to determine {} current position".format(entity.entity_id)
msg = f"Unable to determine {entity.entity_id} current position"
raise AlexaInvalidValueError(msg)
position = response_value = min(100, max(0, range_delta + current))
if position == 100:
@ -1241,9 +1241,7 @@ async def async_api_adjust_range(hass, config, directive, context):
service = SERVICE_SET_COVER_TILT_POSITION
current = entity.attributes.get(cover.ATTR_TILT_POSITION)
if not current:
msg = "Unable to determine {} current tilt position".format(
entity.entity_id
)
msg = f"Unable to determine {entity.entity_id} current tilt position"
raise AlexaInvalidValueError(msg)
tilt_position = response_value = min(100, max(0, range_delta + current))
if tilt_position == 100:
@ -1439,9 +1437,7 @@ async def async_api_set_eq_mode(hass, config, directive, context):
if sound_mode_list and mode.lower() in sound_mode_list:
data[media_player.const.ATTR_SOUND_MODE] = mode.lower()
else:
msg = "failed to map sound mode {} to a mode on {}".format(
mode, entity.entity_id
)
msg = f"failed to map sound mode {mode} to a mode on {entity.entity_id}"
raise AlexaInvalidValueError(msg)
await hass.services.async_call(

View file

@ -30,7 +30,6 @@ from .const import (
CONF_APP_KEY,
DATA_CLIENT,
DOMAIN,
TOPIC_UPDATE,
TYPE_BINARY_SENSOR,
TYPE_SENSOR,
)
@ -378,7 +377,9 @@ class AmbientStation:
if data != self.stations[mac_address][ATTR_LAST_DATA]:
_LOGGER.debug("New data received: %s", data)
self.stations[mac_address][ATTR_LAST_DATA] = data
async_dispatcher_send(self._hass, TOPIC_UPDATE.format(mac_address))
async_dispatcher_send(
self._hass, f"ambient_station_data_update_{mac_address}"
)
_LOGGER.debug("Resetting watchdog")
self._watchdog_listener()
@ -518,7 +519,7 @@ class AmbientWeatherEntity(Entity):
self.async_schedule_update_ha_state(True)
self._async_unsub_dispatcher_connect = async_dispatcher_connect(
self.hass, TOPIC_UPDATE.format(self._mac_address), update
self.hass, f"ambient_station_data_update_{self._mac_address}", update
)
async def async_will_remove_from_hass(self):

View file

@ -8,7 +8,5 @@ CONF_APP_KEY = "app_key"
DATA_CLIENT = "data_client"
TOPIC_UPDATE = "ambient_station_data_update_{0}"
TYPE_BINARY_SENSOR = "binary_sensor"
TYPE_SENSOR = "sensor"

View file

@ -54,7 +54,7 @@ class AmcrestBinarySensor(BinarySensorDevice):
def __init__(self, name, device, sensor_type):
"""Initialize entity."""
self._name = "{} {}".format(name, BINARY_SENSORS[sensor_type][0])
self._name = f"{name} {BINARY_SENSORS[sensor_type][0]}"
self._signal_name = name
self._api = device.api
self._sensor_type = sensor_type

View file

@ -491,9 +491,7 @@ class AmcrestCam(Camera):
"""Enable or disable indicator light."""
try:
self._api.command(
"configManager.cgi?action=setConfig&LightGlobal[0].Enable={}".format(
str(enable).lower()
)
f"configManager.cgi?action=setConfig&LightGlobal[0].Enable={str(enable).lower()}"
)
except AmcrestError as error:
log_update_error(

View file

@ -6,7 +6,7 @@ def service_signal(service, ident=None):
"""Encode service and identifier into signal."""
signal = f"{DOMAIN}_{service}"
if ident:
signal += "_{}".format(ident.replace(".", "_"))
signal += f"_{ident.replace('.', '_')}"
return signal

View file

@ -45,7 +45,7 @@ class AmcrestSensor(Entity):
def __init__(self, name, device, sensor_type):
"""Initialize a sensor for Amcrest camera."""
self._name = "{} {}".format(name, SENSORS[sensor_type][0])
self._name = f"{name} {SENSORS[sensor_type][0]}"
self._signal_name = name
self._api = device.api
self._sensor_type = sensor_type
@ -98,15 +98,21 @@ class AmcrestSensor(Entity):
elif self._sensor_type == SENSOR_SDCARD:
storage = self._api.storage_all
try:
self._attrs["Total"] = "{:.2f} {}".format(*storage["total"])
self._attrs[
"Total"
] = f"{storage['total'][0]:.2f} {storage['total'][1]}"
except ValueError:
self._attrs["Total"] = "{} {}".format(*storage["total"])
self._attrs[
"Total"
] = f"{storage['total'][0]} {storage['total'][1]}"
try:
self._attrs["Used"] = "{:.2f} {}".format(*storage["used"])
self._attrs[
"Used"
] = f"{storage['used'][0]:.2f} {storage['used'][1]}"
except ValueError:
self._attrs["Used"] = "{} {}".format(*storage["used"])
self._attrs["Used"] = f"{storage['used'][0]} {storage['used'][1]}"
try:
self._state = "{:.2f}".format(storage["used_percent"])
self._state = f"{storage['used_percent']:.2f}"
except ValueError:
self._state = storage["used_percent"]
except AmcrestError as error:

View file

@ -75,9 +75,7 @@ class PwrCtrlSwitch(SwitchDevice):
@property
def unique_id(self):
"""Return the unique ID of the device."""
return "{device}-{switch_idx}".format(
device=self._port.device.host, switch_idx=self._port.get_index()
)
return f"{self._port.device.host}-{self._port.get_index()}"
@property
def name(self):

View file

@ -26,7 +26,6 @@ from homeassistant.const import (
URL_API_EVENTS,
URL_API_SERVICES,
URL_API_STATES,
URL_API_STATES_ENTITY,
URL_API_STREAM,
URL_API_TEMPLATE,
__version__,
@ -254,7 +253,7 @@ class APIEntityStateView(HomeAssistantView):
status_code = HTTP_CREATED if is_new_state else 200
resp = self.json(hass.states.get(entity_id), status_code)
resp.headers.add("Location", URL_API_STATES_ENTITY.format(entity_id))
resp.headers.add("Location", f"/api/states/{entity_id}")
return resp

View file

@ -88,16 +88,15 @@ def request_configuration(hass, config, atv, credentials):
try:
await atv.airplay.finish_authentication(pin)
hass.components.persistent_notification.async_create(
"Authentication succeeded!<br /><br />Add the following "
"to credentials: in your apple_tv configuration:<br /><br />"
"{0}".format(credentials),
f"Authentication succeeded!<br /><br />"
f"Add the following to credentials: "
f"in your apple_tv configuration:<br /><br />{credentials}",
title=NOTIFICATION_AUTH_TITLE,
notification_id=NOTIFICATION_AUTH_ID,
)
except DeviceAuthenticationError as ex:
hass.components.persistent_notification.async_create(
"Authentication failed! Did you enter correct PIN?<br /><br />"
"Details: {0}".format(ex),
f"Authentication failed! Did you enter correct PIN?<br /><br />Details: {ex}",
title=NOTIFICATION_AUTH_TITLE,
notification_id=NOTIFICATION_AUTH_ID,
)
@ -124,9 +123,7 @@ async def scan_apple_tvs(hass):
if login_id is None:
login_id = "Home Sharing disabled"
devices.append(
"Name: {0}<br />Host: {1}<br />Login ID: {2}".format(
atv.name, atv.address, login_id
)
f"Name: {atv.name}<br />Host: {atv.address}<br />Login ID: {login_id}"
)
if not devices:

View file

@ -57,7 +57,7 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
def make_filter(callsigns: list) -> str:
"""Make a server-side filter from a list of callsigns."""
return " ".join("b/{0}".format(cs.upper()) for cs in callsigns)
return " ".join(f"b/{sign.upper()}" for sign in callsigns)
def gps_accuracy(gps, posambiguity: int) -> int:

View file

@ -70,7 +70,7 @@ class AquaLogicSensor(Entity):
@property
def name(self):
"""Return the name of the sensor."""
return "AquaLogic {}".format(SENSOR_TYPES[self._type][0])
return f"AquaLogic {SENSOR_TYPES[self._type][0]}"
@property
def unit_of_measurement(self):

View file

@ -70,7 +70,7 @@ class AquaLogicSwitch(SwitchDevice):
@property
def name(self):
"""Return the name of the switch."""
return "AquaLogic {}".format(SWITCH_TYPES[self._type])
return f"AquaLogic {SWITCH_TYPES[self._type]}"
@property
def should_poll(self):

View file

@ -44,9 +44,9 @@ def _optional_zone(value):
def _zone_name_validator(config):
for zone, zone_config in config[CONF_ZONE].items():
if CONF_NAME not in zone_config:
zone_config[CONF_NAME] = "{} ({}:{}) - {}".format(
DEFAULT_NAME, config[CONF_HOST], config[CONF_PORT], zone
)
zone_config[
CONF_NAME
] = f"{DEFAULT_NAME} ({config[CONF_HOST]}:{config[CONF_PORT]}) - {zone}"
return config

View file

@ -140,7 +140,7 @@ class ArestSensor(Entity):
"""Initialize the sensor."""
self.arest = arest
self._resource = resource
self._name = "{} {}".format(location.title(), name.title())
self._name = f"{location.title()} {name.title()}"
self._variable = variable
self._pin = pin
self._state = None
@ -204,8 +204,7 @@ class ArestData:
try:
if str(self._pin[0]) == "A":
response = requests.get(
"{}/analog/{}".format(self._resource, self._pin[1:]),
timeout=10,
f"{self._resource,}/analog/{self._pin[1:]}", timeout=10
)
self.data = {"value": response.json()["return_value"]}
except TypeError:

View file

@ -86,7 +86,7 @@ class ArestSwitchBase(SwitchDevice):
def __init__(self, resource, location, name):
"""Initialize the switch."""
self._resource = resource
self._name = "{} {}".format(location.title(), name.title())
self._name = f"{location.title()} {name.title()}"
self._state = None
self._available = True

View file

@ -67,9 +67,7 @@ def setup(hass, config):
except (ConnectTimeout, HTTPError) as ex:
_LOGGER.error("Unable to connect to Netgear Arlo: %s", str(ex))
hass.components.persistent_notification.create(
"Error: {}<br />"
"You will need to restart hass after fixing."
"".format(ex),
f"Error: {ex}<br />You will need to restart hass after fixing.",
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID,
)

View file

@ -81,8 +81,9 @@ class ArloCam(Camera):
video = self._camera.last_video
if not video:
error_msg = "Video not found for {0}. Is it older than {1} days?".format(
self.name, self._camera.min_days_vdo_cache
error_msg = (
f"Video not found for {self.name}. "
f"Is it older than {self._camera.min_days_vdo_cache} days?"
)
_LOGGER.error(error_msg)
return

View file

@ -57,7 +57,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
if sensor_type in ("temperature", "humidity", "air_quality"):
continue
name = "{0} {1}".format(SENSOR_TYPES[sensor_type][0], camera.name)
name = f"{SENSOR_TYPES[sensor_type][0]} {camera.name}"
sensors.append(ArloSensor(name, camera, sensor_type))
for base_station in arlo.base_stations:
@ -65,9 +65,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
sensor_type in ("temperature", "humidity", "air_quality")
and base_station.model_id == "ABC1000"
):
name = "{0} {1}".format(
SENSOR_TYPES[sensor_type][0], base_station.name
)
name = f"{SENSOR_TYPES[sensor_type][0]} {base_station.name}"
sensors.append(ArloSensor(name, base_station, sensor_type))
add_entities(sensors, True)
@ -83,7 +81,7 @@ class ArloSensor(Entity):
self._data = device
self._sensor_type = sensor_type
self._state = None
self._icon = "mdi:{}".format(SENSOR_TYPES.get(self._sensor_type)[2])
self._icon = f"mdi:{SENSOR_TYPES.get(self._sensor_type)[2]}"
@property
def name(self):
@ -141,8 +139,9 @@ class ArloSensor(Entity):
video = self._data.last_video
self._state = video.created_at_pretty("%m-%d-%Y %H:%M:%S")
except (AttributeError, IndexError):
error_msg = "Video not found for {0}. Older than {1} days?".format(
self.name, self._data.min_days_vdo_cache
error_msg = (
f"Video not found for {self.name}. "
f"Older than {self._data.min_days_vdo_cache} days?"
)
_LOGGER.debug(error_msg)
self._state = None

View file

@ -84,8 +84,8 @@ class ArubaDeviceScanner(DeviceScanner):
def get_aruba_data(self):
"""Retrieve data from Aruba Access Point and return parsed result."""
connect = "ssh {}@{}"
ssh = pexpect.spawn(connect.format(self.username, self.host))
connect = f"ssh {self.username}@{self.host}"
ssh = pexpect.spawn(connect)
query = ssh.expect(
[
"password:",

View file

@ -50,7 +50,7 @@ def discover_sensors(topic, payload):
def _slug(name):
return "sensor.arwn_{}".format(slugify(name))
return f"sensor.arwn_{slugify(name)}"
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):

View file

@ -49,8 +49,10 @@ class AsteriskCDR(Mailbox):
"duration": entry["duration"],
}
sha = hashlib.sha256(str(entry).encode("utf-8")).hexdigest()
msg = "Destination: {}\nApplication: {}\n Context: {}".format(
entry["dest"], entry["application"], entry["context"]
msg = (
f"Destination: {entry['dest']}\n"
f"Application: {entry['application']}\n "
f"Context: {entry['context']}"
)
cdr.append({"info": info, "sha": sha, "text": msg})
self.cdr = cdr

View file

@ -102,8 +102,7 @@ def request_configuration(hass, config, api, authenticator, token_refresh_lock):
_CONFIGURING[DOMAIN] = configurator.request_config(
NOTIFICATION_TITLE,
august_configuration_callback,
description="Please check your {} ({}) and enter the verification "
"code below".format(login_method, username),
description=f"Please check your {login_method} ({username}) and enter the verification code below",
submit_caption="Verify",
fields=[
{"id": "verification_code", "name": "Verification code", "type": "string"}
@ -121,9 +120,7 @@ def setup_august(hass, config, api, authenticator, token_refresh_lock):
_LOGGER.error("Unable to connect to August service: %s", str(ex))
hass.components.persistent_notification.create(
"Error: {}<br />"
"You will need to restart hass after fixing."
"".format(ex),
"Error: {ex}<br />You will need to restart hass after fixing.",
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID,
)

View file

@ -70,14 +70,10 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
for door in data.locks:
if not data.lock_has_doorsense(door.device_id):
_LOGGER.debug(
"Not adding sensor class door for lock %s ", door.device_name,
)
_LOGGER.debug("Not adding sensor class door for lock %s ", door.device_name)
continue
_LOGGER.debug(
"Adding sensor class door for %s", door.device_name,
)
_LOGGER.debug("Adding sensor class door for %s", door.device_name)
devices.append(AugustDoorBinarySensor(data, "door_open", door))
for doorbell in data.doorbells:
@ -121,7 +117,7 @@ class AugustDoorBinarySensor(BinarySensorDevice):
@property
def name(self):
"""Return the name of the binary sensor."""
return "{} Open".format(self._door.device_name)
return f"{self._door.device_name} Open"
async def async_update(self):
"""Get the latest state of the sensor and update activity."""
@ -175,10 +171,7 @@ class AugustDoorbellBinarySensor(BinarySensorDevice):
@property
def name(self):
"""Return the name of the binary sensor."""
return "{} {}".format(
self._doorbell.device_name,
SENSOR_TYPES_DOORBELL[self._sensor_type][SENSOR_NAME],
)
return f"{self._doorbell.device_name} {SENSOR_TYPES_DOORBELL[self._sensor_type][SENSOR_NAME]}"
async def async_update(self):
"""Get the latest state of the sensor."""
@ -194,7 +187,7 @@ class AugustDoorbellBinarySensor(BinarySensorDevice):
@property
def unique_id(self) -> str:
"""Get the unique id of the doorbell sensor."""
return "{:s}_{:s}".format(
self._doorbell.device_id,
SENSOR_TYPES_DOORBELL[self._sensor_type][SENSOR_NAME].lower(),
return (
f"{self._doorbell.device_id}_"
f"{SENSOR_TYPES_DOORBELL[self._sensor_type][SENSOR_NAME].lower()}"
)

View file

@ -96,7 +96,6 @@ class AuroraABBSolarPVMonitorSensor(Entity):
if "No response after" in str(error):
_LOGGER.debug("No response from inverter (could be dark)")
else:
# print("Exception!!: {}".format(str(e)))
raise error
self._state = None
finally:

View file

@ -28,7 +28,6 @@ from homeassistant.helpers.event import async_track_time_interval
_LOGGER = logging.getLogger(__name__)
ATTR_FUEL_LEVEL = "fuel_level"
AUTOMATIC_CONFIG_FILE = ".automatic/session-{}.json"
CONF_CLIENT_ID = "client_id"
CONF_CURRENT_LOCATION = "current_location"
@ -95,7 +94,7 @@ def async_setup_scanner(hass, config, async_see, discovery_info=None):
request_kwargs={"timeout": DEFAULT_TIMEOUT},
)
filename = AUTOMATIC_CONFIG_FILE.format(config[CONF_CLIENT_ID])
filename = f".automatic/session-{config[CONF_CLIENT_ID]}.json"
refresh_token = yield from hass.async_add_job(
_get_refresh_token_from_file, hass, filename
)

View file

@ -72,9 +72,7 @@ AutomationActionType = Callable[[HomeAssistant, TemplateVarsType], Awaitable[Non
def _platform_validator(config):
"""Validate it is a valid platform."""
try:
platform = importlib.import_module(
".{}".format(config[CONF_PLATFORM]), __name__
)
platform = importlib.import_module(f".{config[CONF_PLATFORM]}", __name__)
except ImportError:
raise vol.Invalid("Invalid platform specified") from None
@ -221,7 +219,7 @@ async def async_setup(hass, config):
await _async_process_config(hass, conf, component)
async_register_admin_service(
hass, DOMAIN, SERVICE_RELOAD, reload_service_handler, schema=vol.Schema({}),
hass, DOMAIN, SERVICE_RELOAD, reload_service_handler, schema=vol.Schema({})
)
return True
@ -456,9 +454,7 @@ class AutomationEntity(ToggleEntity, RestoreEntity):
info = {"name": self._name}
for conf in self._trigger_config:
platform = importlib.import_module(
".{}".format(conf[CONF_PLATFORM]), __name__
)
platform = importlib.import_module(f".{conf[CONF_PLATFORM]}", __name__)
remove = await platform.async_attach_trigger(
self.hass, conf, self.async_trigger, info

View file

@ -26,7 +26,7 @@ async def async_validate_config_item(hass, config, full_config=None):
triggers = []
for trigger in config[CONF_TRIGGER]:
trigger_platform = importlib.import_module(
"..{}".format(trigger[CONF_PLATFORM]), __name__
f"..{trigger[CONF_PLATFORM]}", __name__
)
if hasattr(trigger_platform, "async_validate_trigger_config"):
trigger = await trigger_platform.async_validate_trigger_config(

View file

@ -79,8 +79,8 @@ class AxisBinarySensor(AxisEventBase, BinarySensorDevice):
and self.event.id
and self.device.api.vapix.ports[self.event.id].name
):
return "{} {}".format(
self.device.name, self.device.api.vapix.ports[self.event.id].name
return (
f"{self.device.name} {self.device.api.vapix.ports[self.event.id].name}"
)
return super().name

View file

@ -21,10 +21,6 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .axis_base import AxisEntityBase
from .const import DOMAIN as AXIS_DOMAIN
AXIS_IMAGE = "http://{}:{}/axis-cgi/jpg/image.cgi"
AXIS_VIDEO = "http://{}:{}/axis-cgi/mjpg/video.cgi"
AXIS_STREAM = "rtsp://{}:{}@{}/axis-media/media.amp?videocodec=h264"
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the Axis camera video stream."""
@ -36,11 +32,13 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
CONF_NAME: config_entry.data[CONF_NAME],
CONF_USERNAME: config_entry.data[CONF_USERNAME],
CONF_PASSWORD: config_entry.data[CONF_PASSWORD],
CONF_MJPEG_URL: AXIS_VIDEO.format(
config_entry.data[CONF_HOST], config_entry.data[CONF_PORT],
CONF_MJPEG_URL: (
f"http://{config_entry.data[CONF_HOST]}"
f":{config_entry.data[CONF_PORT]}/axis-cgi/mjpg/video.cgi"
),
CONF_STILL_IMAGE_URL: AXIS_IMAGE.format(
config_entry.data[CONF_HOST], config_entry.data[CONF_PORT],
CONF_STILL_IMAGE_URL: (
f"http://{config_entry.data[CONF_HOST]}"
f":{config_entry.data[CONF_PORT]}/axis-cgi/jpg/image.cgi"
),
CONF_AUTHENTICATION: HTTP_DIGEST_AUTHENTICATION,
}
@ -72,17 +70,19 @@ class AxisCamera(AxisEntityBase, MjpegCamera):
async def stream_source(self):
"""Return the stream source."""
return AXIS_STREAM.format(
self.device.config_entry.data[CONF_USERNAME],
self.device.config_entry.data[CONF_PASSWORD],
self.device.host,
return (
f"rtsp://{self.device.config_entry.data[CONF_USERNAME]}´"
f":{self.device.config_entry.data[CONF_PASSWORD]}"
f"@{self.device.host}/axis-media/media.amp?videocodec=h264"
)
def _new_address(self):
"""Set new device address for video stream."""
port = self.device.config_entry.data[CONF_PORT]
self._mjpeg_url = AXIS_VIDEO.format(self.device.host, port)
self._still_image_url = AXIS_IMAGE.format(self.device.host, port)
self._mjpeg_url = (f"http://{self.device.host}:{port}/axis-cgi/mjpg/video.cgi",)
self._still_image_url = (
f"http://{self.device.host}:{port}/axis-cgi/jpg/image.cgi"
)
@property
def unique_id(self):

View file

@ -53,8 +53,8 @@ class AxisSwitch(AxisEventBase, SwitchDevice):
def name(self):
"""Return the name of the event."""
if self.event.id and self.device.api.vapix.ports[self.event.id].name:
return "{} {}".format(
self.device.name, self.device.api.vapix.ports[self.event.id].name
return (
f"{self.device.name} {self.device.api.vapix.ports[self.event.id].name}"
)
return super().name

View file

@ -47,8 +47,9 @@ async def async_setup(hass: HomeAssistant, yaml_config: Dict[str, Any]):
"""Activate Azure EH component."""
config = yaml_config[DOMAIN]
event_hub_address = "amqps://{}.servicebus.windows.net/{}".format(
config[CONF_EVENT_HUB_NAMESPACE], config[CONF_EVENT_HUB_INSTANCE_NAME]
event_hub_address = (
f"amqps://{config[CONF_EVENT_HUB_NAMESPACE]}"
f".servicebus.windows.net/{config[CONF_EVENT_HUB_INSTANCE_NAME]}"
)
entities_filter = config[CONF_FILTER]

View file

@ -40,11 +40,9 @@ async def test_show_authenticate_form(hass):
async def test_connection_error(hass, aioclient_mock):
"""Test we show user form on AdGuard Home connection error."""
aioclient_mock.get(
"{}://{}:{}/control/status".format(
"https" if FIXTURE_USER_INPUT[CONF_SSL] else "http",
FIXTURE_USER_INPUT[CONF_HOST],
FIXTURE_USER_INPUT[CONF_PORT],
),
f"{'https' if FIXTURE_USER_INPUT[CONF_SSL] else 'http'}"
f"://{FIXTURE_USER_INPUT[CONF_HOST]}"
f":{FIXTURE_USER_INPUT[CONF_PORT]}/control/status",
exc=aiohttp.ClientError,
)
@ -60,11 +58,9 @@ async def test_connection_error(hass, aioclient_mock):
async def test_full_flow_implementation(hass, aioclient_mock):
"""Test registering an integration and finishing flow works."""
aioclient_mock.get(
"{}://{}:{}/control/status".format(
"https" if FIXTURE_USER_INPUT[CONF_SSL] else "http",
FIXTURE_USER_INPUT[CONF_HOST],
FIXTURE_USER_INPUT[CONF_PORT],
),
f"{'https' if FIXTURE_USER_INPUT[CONF_SSL] else 'http'}"
f"://{FIXTURE_USER_INPUT[CONF_HOST]}"
f":{FIXTURE_USER_INPUT[CONF_PORT]}/control/status",
json={"version": "v0.99.0"},
headers={"Content-Type": "application/json"},
)
@ -244,11 +240,9 @@ async def test_hassio_connection_error(hass, aioclient_mock):
async def test_outdated_adguard_version(hass, aioclient_mock):
"""Test we show abort when connecting with unsupported AdGuard version."""
aioclient_mock.get(
"{}://{}:{}/control/status".format(
"https" if FIXTURE_USER_INPUT[CONF_SSL] else "http",
FIXTURE_USER_INPUT[CONF_HOST],
FIXTURE_USER_INPUT[CONF_PORT],
),
f"{'https' if FIXTURE_USER_INPUT[CONF_SSL] else 'http'}"
f"://{FIXTURE_USER_INPUT[CONF_HOST]}"
f":{FIXTURE_USER_INPUT[CONF_PORT]}/control/status",
json={"version": "v0.98.0"},
headers={"Content-Type": "application/json"},
)

View file

@ -207,20 +207,18 @@ async def test_if_fires_on_state_change(hass, calls):
hass.states.async_set("alarm_control_panel.entity", STATE_ALARM_TRIGGERED)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data[
"some"
] == "triggered - device - {} - pending - triggered - None".format(
"alarm_control_panel.entity"
assert (
calls[0].data["some"]
== "triggered - device - alarm_control_panel.entity - pending - triggered - None"
)
# Fake that the entity is disarmed.
hass.states.async_set("alarm_control_panel.entity", STATE_ALARM_DISARMED)
await hass.async_block_till_done()
assert len(calls) == 2
assert calls[1].data[
"some"
] == "disarmed - device - {} - triggered - disarmed - None".format(
"alarm_control_panel.entity"
assert (
calls[1].data["some"]
== "disarmed - device - alarm_control_panel.entity - triggered - disarmed - None"
)
# Fake that the entity is armed home.
@ -228,10 +226,9 @@ async def test_if_fires_on_state_change(hass, calls):
hass.states.async_set("alarm_control_panel.entity", STATE_ALARM_ARMED_HOME)
await hass.async_block_till_done()
assert len(calls) == 3
assert calls[2].data[
"some"
] == "armed_home - device - {} - pending - armed_home - None".format(
"alarm_control_panel.entity"
assert (
calls[2].data["some"]
== "armed_home - device - alarm_control_panel.entity - pending - armed_home - None"
)
# Fake that the entity is armed away.
@ -239,10 +236,9 @@ async def test_if_fires_on_state_change(hass, calls):
hass.states.async_set("alarm_control_panel.entity", STATE_ALARM_ARMED_AWAY)
await hass.async_block_till_done()
assert len(calls) == 4
assert calls[3].data[
"some"
] == "armed_away - device - {} - pending - armed_away - None".format(
"alarm_control_panel.entity"
assert (
calls[3].data["some"]
== "armed_away - device - alarm_control_panel.entity - pending - armed_away - None"
)
# Fake that the entity is armed night.
@ -250,8 +246,7 @@ async def test_if_fires_on_state_change(hass, calls):
hass.states.async_set("alarm_control_panel.entity", STATE_ALARM_ARMED_NIGHT)
await hass.async_block_till_done()
assert len(calls) == 5
assert calls[4].data[
"some"
] == "armed_night - device - {} - pending - armed_night - None".format(
"alarm_control_panel.entity"
assert (
calls[4].data["some"]
== "armed_night - device - alarm_control_panel.entity - pending - armed_night - None"
)

View file

@ -60,7 +60,7 @@ TEST_NOACK = [
None,
None,
]
ENTITY_ID = alert.ENTITY_ID_FORMAT.format(NAME)
ENTITY_ID = f"{alert.DOMAIN}.{NAME}"
def turn_on(hass, entity_id):

View file

@ -63,7 +63,7 @@ def alexa_client(loop, hass, hass_client):
def _flash_briefing_req(client, briefing_id):
return client.get("/api/alexa/flash_briefings/{}".format(briefing_id))
return client.get(f"/api/alexa/flash_briefings/{briefing_id}")
async def test_flash_briefing_invalid_id(alexa_client):

View file

@ -36,7 +36,7 @@ async def test_api_list_state_entities(hass, mock_api_client):
async def test_api_get_state(hass, mock_api_client):
"""Test if the debug interface allows us to get a state."""
hass.states.async_set("hello.world", "nice", {"attr": 1})
resp = await mock_api_client.get(const.URL_API_STATES_ENTITY.format("hello.world"))
resp = await mock_api_client.get("/api/states/hello.world")
assert resp.status == 200
json = await resp.json()
@ -51,9 +51,7 @@ async def test_api_get_state(hass, mock_api_client):
async def test_api_get_non_existing_state(hass, mock_api_client):
"""Test if the debug interface allows us to get a state."""
resp = await mock_api_client.get(
const.URL_API_STATES_ENTITY.format("does_not_exist")
)
resp = await mock_api_client.get("/api/states/does_not_exist")
assert resp.status == 404
@ -62,8 +60,7 @@ async def test_api_state_change(hass, mock_api_client):
hass.states.async_set("test.test", "not_to_be_set")
await mock_api_client.post(
const.URL_API_STATES_ENTITY.format("test.test"),
json={"state": "debug_state_change2"},
"/api/states/test.test", json={"state": "debug_state_change2"}
)
assert hass.states.get("test.test").state == "debug_state_change2"
@ -75,8 +72,7 @@ async def test_api_state_change_of_non_existing_entity(hass, mock_api_client):
new_state = "debug_state_change"
resp = await mock_api_client.post(
const.URL_API_STATES_ENTITY.format("test_entity.that_does_not_exist"),
json={"state": new_state},
"/api/states/test_entity.that_does_not_exist", json={"state": new_state}
)
assert resp.status == 201
@ -88,7 +84,7 @@ async def test_api_state_change_of_non_existing_entity(hass, mock_api_client):
async def test_api_state_change_with_bad_data(hass, mock_api_client):
"""Test if API sends appropriate error if we omit state."""
resp = await mock_api_client.post(
const.URL_API_STATES_ENTITY.format("test_entity.that_does_not_exist"), json={}
"/api/states/test_entity.that_does_not_exist", json={}
)
assert resp.status == 400
@ -98,15 +94,13 @@ async def test_api_state_change_with_bad_data(hass, mock_api_client):
async def test_api_state_change_to_zero_value(hass, mock_api_client):
"""Test if changing a state to a zero value is possible."""
resp = await mock_api_client.post(
const.URL_API_STATES_ENTITY.format("test_entity.with_zero_state"),
json={"state": 0},
"/api/states/test_entity.with_zero_state", json={"state": 0}
)
assert resp.status == 201
resp = await mock_api_client.post(
const.URL_API_STATES_ENTITY.format("test_entity.with_zero_state"),
json={"state": 0.0},
"/api/states/test_entity.with_zero_state", json={"state": 0.0}
)
assert resp.status == 200
@ -126,15 +120,12 @@ async def test_api_state_change_push(hass, mock_api_client):
hass.bus.async_listen(const.EVENT_STATE_CHANGED, event_listener)
await mock_api_client.post(
const.URL_API_STATES_ENTITY.format("test.test"), json={"state": "not_to_be_set"}
)
await mock_api_client.post("/api/states/test.test", json={"state": "not_to_be_set"})
await hass.async_block_till_done()
assert len(events) == 0
await mock_api_client.post(
const.URL_API_STATES_ENTITY.format("test.test"),
json={"state": "not_to_be_set", "force_update": True},
"/api/states/test.test", json={"state": "not_to_be_set", "force_update": True}
)
await hass.async_block_till_done()
assert len(events) == 1
@ -152,7 +143,7 @@ async def test_api_fire_event_with_no_data(hass, mock_api_client):
hass.bus.async_listen_once("test.event_no_data", listener)
await mock_api_client.post(const.URL_API_EVENTS_EVENT.format("test.event_no_data"))
await mock_api_client.post("/api/events/test.event_no_data")
await hass.async_block_till_done()
assert len(test_value) == 1
@ -174,9 +165,7 @@ async def test_api_fire_event_with_data(hass, mock_api_client):
hass.bus.async_listen_once("test_event_with_data", listener)
await mock_api_client.post(
const.URL_API_EVENTS_EVENT.format("test_event_with_data"), json={"test": 1}
)
await mock_api_client.post("/api/events/test_event_with_data", json={"test": 1})
await hass.async_block_till_done()
@ -196,8 +185,7 @@ async def test_api_fire_event_with_invalid_json(hass, mock_api_client):
hass.bus.async_listen_once("test_event_bad_data", listener)
resp = await mock_api_client.post(
const.URL_API_EVENTS_EVENT.format("test_event_bad_data"),
data=json.dumps("not an object"),
"/api/events/test_event_bad_data", data=json.dumps("not an object")
)
await hass.async_block_till_done()
@ -207,8 +195,7 @@ async def test_api_fire_event_with_invalid_json(hass, mock_api_client):
# Try now with valid but unusable JSON
resp = await mock_api_client.post(
const.URL_API_EVENTS_EVENT.format("test_event_bad_data"),
data=json.dumps([1, 2, 3]),
"/api/events/test_event_bad_data", data=json.dumps([1, 2, 3])
)
await hass.async_block_till_done()
@ -272,9 +259,7 @@ async def test_api_call_service_no_data(hass, mock_api_client):
hass.services.async_register("test_domain", "test_service", listener)
await mock_api_client.post(
const.URL_API_SERVICES_SERVICE.format("test_domain", "test_service")
)
await mock_api_client.post("/api/services/test_domain/test_service")
await hass.async_block_till_done()
assert len(test_value) == 1
@ -295,8 +280,7 @@ async def test_api_call_service_with_data(hass, mock_api_client):
hass.services.async_register("test_domain", "test_service", listener)
await mock_api_client.post(
const.URL_API_SERVICES_SERVICE.format("test_domain", "test_service"),
json={"test": 1},
"/api/services/test_domain/test_service", json={"test": 1}
)
await hass.async_block_till_done()
@ -348,7 +332,7 @@ async def test_stream_with_restricted(hass, mock_api_client):
listen_count = _listen_count(hass)
resp = await mock_api_client.get(
"{}?restrict=test_event1,test_event3".format(const.URL_API_STREAM)
f"{const.URL_API_STREAM}?restrict=test_event1,test_event3"
)
assert resp.status == 200
assert listen_count + 1 == _listen_count(hass)
@ -403,7 +387,7 @@ async def test_api_error_log(hass, aiohttp_client, hass_access_token, hass_admin
) as mock_file:
resp = await client.get(
const.URL_API_ERROR_LOG,
headers={"Authorization": "Bearer {}".format(hass_access_token)},
headers={"Authorization": f"Bearer {hass_access_token}"},
)
assert len(mock_file.mock_calls) == 1
@ -415,7 +399,7 @@ async def test_api_error_log(hass, aiohttp_client, hass_access_token, hass_admin
hass_admin_user.groups = []
resp = await client.get(
const.URL_API_ERROR_LOG,
headers={"Authorization": "Bearer {}".format(hass_access_token)},
headers={"Authorization": f"Bearer {hass_access_token}"},
)
assert resp.status == 401
@ -432,8 +416,8 @@ async def test_api_fire_event_context(hass, mock_api_client, hass_access_token):
hass.bus.async_listen("test.event", listener)
await mock_api_client.post(
const.URL_API_EVENTS_EVENT.format("test.event"),
headers={"authorization": "Bearer {}".format(hass_access_token)},
"/api/events/test.event",
headers={"authorization": f"Bearer {hass_access_token}"},
)
await hass.async_block_till_done()
@ -449,7 +433,7 @@ async def test_api_call_service_context(hass, mock_api_client, hass_access_token
await mock_api_client.post(
"/api/services/test_domain/test_service",
headers={"authorization": "Bearer {}".format(hass_access_token)},
headers={"authorization": f"Bearer {hass_access_token}"},
)
await hass.async_block_till_done()
@ -464,7 +448,7 @@ async def test_api_set_state_context(hass, mock_api_client, hass_access_token):
await mock_api_client.post(
"/api/states/light.kitchen",
json={"state": "on"},
headers={"authorization": "Bearer {}".format(hass_access_token)},
headers={"authorization": f"Bearer {hass_access_token}"},
)
refresh_token = await hass.auth.async_validate_access_token(hass_access_token)
@ -542,9 +526,7 @@ async def test_rendering_template_legacy_user(
async def test_api_call_service_not_found(hass, mock_api_client):
"""Test if the API fails 400 if unknown service."""
resp = await mock_api_client.post(
const.URL_API_SERVICES_SERVICE.format("test_domain", "test_service")
)
resp = await mock_api_client.post("/api/services/test_domain/test_service")
assert resp.status == 400
@ -562,7 +544,6 @@ async def test_api_call_service_bad_data(hass, mock_api_client):
)
resp = await mock_api_client.post(
const.URL_API_SERVICES_SERVICE.format("test_domain", "test_service"),
json={"hello": 5},
"/api/services/test_domain/test_service", json={"hello": 5}
)
assert resp.status == 400

View file

@ -28,7 +28,7 @@ async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
step = await resp.json()
resp = await client.post(
"/auth/login_flow/{}".format(step["flow_id"]),
f"/auth/login_flow/{step['flow_id']}",
json={"client_id": CLIENT_ID, "username": "test-user", "password": "test-pass"},
)
@ -71,7 +71,7 @@ async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
assert resp.status == 401
resp = await client.get(
"/api/", headers={"authorization": "Bearer {}".format(tokens["access_token"])}
"/api/", headers={"authorization": f"Bearer {tokens['access_token']}"}
)
assert resp.status == 200

View file

@ -42,7 +42,7 @@ async def async_get_code(hass, aiohttp_client):
step = await resp.json()
resp = await client.post(
"/auth/login_flow/{}".format(step["flow_id"]),
f"/auth/login_flow/{step['flow_id']}",
json={"client_id": CLIENT_ID, "username": "2nd-user", "password": "2nd-pass"},
)
@ -67,7 +67,7 @@ async def test_link_user(hass, aiohttp_client):
resp = await client.post(
"/auth/link_user",
json={"client_id": CLIENT_ID, "code": code},
headers={"authorization": "Bearer {}".format(info["access_token"])},
headers={"authorization": f"Bearer {info['access_token']}"},
)
assert resp.status == 200
@ -84,7 +84,7 @@ async def test_link_user_invalid_client_id(hass, aiohttp_client):
resp = await client.post(
"/auth/link_user",
json={"client_id": "invalid", "code": code},
headers={"authorization": "Bearer {}".format(info["access_token"])},
headers={"authorization": f"Bearer {info['access_token']}"},
)
assert resp.status == 400
@ -100,7 +100,7 @@ async def test_link_user_invalid_code(hass, aiohttp_client):
resp = await client.post(
"/auth/link_user",
json={"client_id": CLIENT_ID, "code": "invalid"},
headers={"authorization": "Bearer {}".format(info["access_token"])},
headers={"authorization": f"Bearer {info['access_token']}"},
)
assert resp.status == 400

View file

@ -54,7 +54,7 @@ async def test_invalid_username_password(hass, aiohttp_client):
# Incorrect username
resp = await client.post(
"/auth/login_flow/{}".format(step["flow_id"]),
f"/auth/login_flow/{step['flow_id']}",
json={
"client_id": CLIENT_ID,
"username": "wrong-user",
@ -70,7 +70,7 @@ async def test_invalid_username_password(hass, aiohttp_client):
# Incorrect password
resp = await client.post(
"/auth/login_flow/{}".format(step["flow_id"]),
f"/auth/login_flow/{step['flow_id']}",
json={
"client_id": CLIENT_ID,
"username": "test-user",
@ -105,7 +105,7 @@ async def test_login_exist_user(hass, aiohttp_client):
step = await resp.json()
resp = await client.post(
"/auth/login_flow/{}".format(step["flow_id"]),
f"/auth/login_flow/{step['flow_id']}",
json={"client_id": CLIENT_ID, "username": "test-user", "password": "test-pass"},
)