Improve lists in integrations [R-S] (#113233)

* Improve lists in integrations [R-S]

* Fix

* Fix
This commit is contained in:
Joost Lekkerkerker 2024-03-13 21:55:00 +01:00 committed by GitHub
parent e6a692f354
commit 77917506bb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
60 changed files with 543 additions and 611 deletions

View file

@ -245,10 +245,11 @@ class RachioIro:
_deinit_webhooks(None)
# Choose which events to listen for and get their IDs
event_types = []
for event_type in self.rachio.notification.get_webhook_event_type()[1]:
if event_type[KEY_NAME] in LISTEN_EVENT_TYPES:
event_types.append({"id": event_type[KEY_ID]})
event_types = [
{"id": event_type[KEY_ID]}
for event_type in self.rachio.notification.get_webhook_event_type()[1]
if event_type[KEY_NAME] in LISTEN_EVENT_TYPES
]
# Register to listen to these events from the device
url = self.rachio.webhook_url

View file

@ -169,10 +169,13 @@ def _create_entities(hass: HomeAssistant, config_entry: ConfigEntry) -> list[Ent
schedules = controller.list_schedules()
flex_schedules = controller.list_flex_schedules()
current_schedule = controller.current_schedule
for zone in zones:
entities.append(RachioZone(person, controller, zone, current_schedule))
for sched in schedules + flex_schedules:
entities.append(RachioSchedule(person, controller, sched, current_schedule))
entities.extend(
RachioZone(person, controller, zone, current_schedule) for zone in zones
)
entities.extend(
RachioSchedule(person, controller, schedule, current_schedule)
for schedule in schedules + flex_schedules
)
return entities

View file

@ -45,8 +45,10 @@ def setup_platform(
else:
# create a sensor for each zone managed by faucet
for zone in raincloud.controller.faucet.zones:
sensors.append(RainCloudBinarySensor(zone, sensor_type))
sensors.extend(
RainCloudBinarySensor(zone, sensor_type)
for zone in raincloud.controller.faucet.zones
)
add_entities(sensors, True)

View file

@ -48,8 +48,10 @@ def setup_platform(
sensors.append(RainCloudSensor(raincloud.controller.faucet, sensor_type))
else:
# create a sensor for each zone managed by a faucet
for zone in raincloud.controller.faucet.zones:
sensors.append(RainCloudSensor(zone, sensor_type))
sensors.extend(
RainCloudSensor(zone, sensor_type)
for zone in raincloud.controller.faucet.zones
)
add_entities(sensors, True)

View file

@ -47,13 +47,14 @@ def setup_platform(
raincloud = hass.data[DATA_RAINCLOUD].data
default_watering_timer = config[CONF_WATERING_TIME]
sensors = []
for sensor_type in config[CONF_MONITORED_CONDITIONS]:
# create a sensor for each zone managed by faucet
for zone in raincloud.controller.faucet.zones:
sensors.append(RainCloudSwitch(default_watering_timer, zone, sensor_type))
add_entities(sensors, True)
add_entities(
(
RainCloudSwitch(default_watering_timer, zone, sensor_type)
for zone in raincloud.controller.faucet.zones
for sensor_type in config[CONF_MONITORED_CONDITIONS]
),
True,
)
class RainCloudSwitch(RainCloudEntity, SwitchEntity):

View file

@ -512,21 +512,20 @@ def _update_states_table_with_foreign_key_options(
) -> None:
"""Add the options to foreign key constraints."""
inspector = sqlalchemy.inspect(engine)
alters = []
for foreign_key in inspector.get_foreign_keys(TABLE_STATES):
if foreign_key["name"] and (
alters = [
{
"old_fk": ForeignKeyConstraint((), (), name=foreign_key["name"]),
"columns": foreign_key["constrained_columns"],
}
for foreign_key in inspector.get_foreign_keys(TABLE_STATES)
if foreign_key["name"]
and (
# MySQL/MariaDB will have empty options
not foreign_key.get("options")
or
# Postgres will have ondelete set to None
foreign_key.get("options", {}).get("ondelete") is None
):
alters.append(
{
"old_fk": ForeignKeyConstraint((), (), name=foreign_key["name"]),
"columns": foreign_key["constrained_columns"],
}
)
or foreign_key.get("options", {}).get("ondelete") is None
)
]
if not alters:
return

View file

@ -243,7 +243,6 @@ class ReolinkVODMediaSource(MediaSource):
start = now - dt.timedelta(days=31)
end = now
children: list[BrowseMediaSource] = []
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug(
"Requesting recording days of %s from %s to %s",
@ -254,19 +253,19 @@ class ReolinkVODMediaSource(MediaSource):
statuses, _ = await host.api.request_vod_files(
channel, start, end, status_only=True, stream=stream
)
for status in statuses:
for day in status.days:
children.append(
BrowseMediaSource(
domain=DOMAIN,
identifier=f"DAY|{config_entry_id}|{channel}|{stream}|{status.year}|{status.month}|{day}",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.PLAYLIST,
title=f"{status.year}/{status.month}/{day}",
can_play=False,
can_expand=True,
)
)
children: list[BrowseMediaSource] = [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"DAY|{config_entry_id}|{channel}|{stream}|{status.year}|{status.month}|{day}",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaType.PLAYLIST,
title=f"{status.year}/{status.month}/{day}",
can_play=False,
can_expand=True,
)
for status in statuses
for day in status.days
]
return BrowseMediaSource(
domain=DOMAIN,

View file

@ -51,21 +51,17 @@ async def async_get_actions(
except ValueError:
return []
actions = []
for action_type in ACTION_TYPES:
if hasattr(device, action_type):
data: dict[int, str] = getattr(device, ACTION_SELECTION[action_type], {})
for value in data.values():
actions.append(
{
CONF_DEVICE_ID: device_id,
CONF_DOMAIN: DOMAIN,
CONF_TYPE: action_type,
CONF_SUBTYPE: value,
}
)
return actions
return [
{
CONF_DEVICE_ID: device_id,
CONF_DOMAIN: DOMAIN,
CONF_TYPE: action_type,
CONF_SUBTYPE: value,
}
for action_type in ACTION_TYPES
if hasattr(device, action_type)
for value in getattr(device, ACTION_SELECTION[action_type], {}).values()
]
def _get_commands(

View file

@ -51,20 +51,17 @@ async def async_get_triggers(
"""List device triggers for RFXCOM RFXtrx devices."""
device = async_get_device_object(hass, device_id)
triggers = []
for conf_type in TRIGGER_TYPES:
data: dict[int, str] = getattr(device, TRIGGER_SELECTION[conf_type], {})
for command in data.values():
triggers.append(
{
CONF_PLATFORM: "device",
CONF_DEVICE_ID: device_id,
CONF_DOMAIN: DOMAIN,
CONF_TYPE: conf_type,
CONF_SUBTYPE: command,
}
)
return triggers
return [
{
CONF_PLATFORM: "device",
CONF_DEVICE_ID: device_id,
CONF_DOMAIN: DOMAIN,
CONF_TYPE: conf_type,
CONF_SUBTYPE: command,
}
for conf_type in TRIGGER_TYPES
for command in getattr(device, TRIGGER_SELECTION[conf_type], {}).values()
]
async def async_validate_trigger_config(

View file

@ -255,18 +255,15 @@ async def async_setup_entry(
device_id: DeviceTuple,
entity_info: dict[str, Any],
) -> list[Entity]:
entities: list[Entity] = []
for data_type in set(event.values) & set(SENSOR_TYPES_DICT):
entities.append(
RfxtrxSensor(
event.device,
device_id,
SENSOR_TYPES_DICT[data_type],
event=event if auto else None,
)
return [
RfxtrxSensor(
event.device,
device_id,
SENSOR_TYPES_DICT[data_type],
event=event if auto else None,
)
return entities
for data_type in set(event.values) & set(SENSOR_TYPES_DICT)
]
await async_setup_platform_entry(
hass, config_entry, async_add_entities, _supported, _constructor

View file

@ -34,10 +34,11 @@ async def async_get_config_entry_diagnostics(
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
ring: ring_doorbell.Ring = hass.data[DOMAIN][entry.entry_id]["api"]
devices_raw = []
for device_type in ring.devices_data:
for device_id in ring.devices_data[device_type]:
devices_raw.append(ring.devices_data[device_type][device_id])
devices_raw = [
ring.devices_data[device_type][device_id]
for device_type in ring.devices_data
for device_id in ring.devices_data[device_type]
]
return async_redact_data(
{"device_data": devices_raw},
TO_REDACT,

View file

@ -41,13 +41,12 @@ async def async_setup_entry(
devices_coordinator: RingDataCoordinator = hass.data[DOMAIN][config_entry.entry_id][
RING_DEVICES_COORDINATOR
]
lights = []
for device in devices["stickup_cams"]:
if device.has_capability("light"):
lights.append(RingLight(device, devices_coordinator))
async_add_entities(lights)
async_add_entities(
RingLight(device, devices_coordinator)
for device in devices["stickup_cams"]
if device.has_capability("light")
)
class RingLight(RingEntity, LightEntity):

View file

@ -28,12 +28,10 @@ async def async_setup_entry(
coordinator: RingDataCoordinator = hass.data[DOMAIN][config_entry.entry_id][
RING_DEVICES_COORDINATOR
]
sirens = []
for device in devices["chimes"]:
sirens.append(RingChimeSiren(device, coordinator))
async_add_entities(sirens)
async_add_entities(
RingChimeSiren(device, coordinator) for device in devices["chimes"]
)
class RingChimeSiren(RingEntity, SirenEntity):

View file

@ -38,13 +38,12 @@ async def async_setup_entry(
coordinator: RingDataCoordinator = hass.data[DOMAIN][config_entry.entry_id][
RING_DEVICES_COORDINATOR
]
switches = []
for device in devices["stickup_cams"]:
if device.has_capability("siren"):
switches.append(SirenSwitch(device, coordinator))
async_add_entities(switches)
async_add_entities(
SirenSwitch(device, coordinator)
for device in devices["stickup_cams"]
if device.has_capability("siren")
)
class BaseRingSwitch(RingEntity, SwitchEntity):

View file

@ -87,12 +87,10 @@ def build_setup_functions(
product_info: dict[str, HomeDataProduct],
) -> list[Coroutine[Any, Any, RoborockDataUpdateCoordinator | None]]:
"""Create a list of setup functions that can later be called asynchronously."""
setup_functions = []
for device in device_map.values():
setup_functions.append(
setup_device(hass, user_data, device, product_info[device.product_id])
)
return setup_functions
return [
setup_device(hass, user_data, device, product_info[device.product_id])
for device in device_map.values()
]
async def setup_device(

View file

@ -115,15 +115,13 @@ async def async_setup_entry(
coordinator: RokuDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
device: RokuDevice = coordinator.data
entities: list[RokuSelectEntity] = []
for description in ENTITIES:
entities.append(
RokuSelectEntity(
coordinator=coordinator,
description=description,
)
entities: list[RokuSelectEntity] = [
RokuSelectEntity(
coordinator=coordinator,
description=description,
)
for description in ENTITIES
]
if len(device.channels) > 0:
entities.append(

View file

@ -37,11 +37,11 @@ class BraavaJet(IRobotVacuum):
super().__init__(roomba, blid)
# Initialize fan speed list
speed_list = []
for behavior in BRAAVA_MOP_BEHAVIORS:
for spray in BRAAVA_SPRAY_AMOUNT:
speed_list.append(f"{behavior}-{spray}")
self._attr_fan_speed_list = speed_list
self._attr_fan_speed_list = [
f"{behavior}-{spray}"
for behavior in BRAAVA_MOP_BEHAVIORS
for spray in BRAAVA_SPRAY_AMOUNT
]
@property
def fan_speed(self):

View file

@ -58,9 +58,7 @@ def setup_platform(
russ = russound.Russound(host, port)
russ.connect()
sources = []
for source in config[CONF_SOURCES]:
sources.append(source["name"])
sources = [source["name"] for source in config[CONF_SOURCES]]
if russ.is_connected():
for zone_id, extra in config[CONF_ZONES].items():

View file

@ -79,7 +79,7 @@ async def async_setup_platform(
sensor_def = pysaj.Sensors(wifi)
# Use all sensors by default
hass_sensors = []
hass_sensors: list[SAJsensor] = []
kwargs = {}
if wifi:
@ -103,11 +103,11 @@ async def async_setup_platform(
if not done:
raise PlatformNotReady
for sensor in sensor_def:
if sensor.enabled:
hass_sensors.append(
SAJsensor(saj.serialnumber, sensor, inverter_name=config.get(CONF_NAME))
)
hass_sensors.extend(
SAJsensor(saj.serialnumber, sensor, inverter_name=config.get(CONF_NAME))
for sensor in sensor_def
if sensor.enabled
)
async_add_entities(hass_sensors)

View file

@ -45,17 +45,15 @@ async def async_setup_entry(
) -> None:
"""Set up binary_sensors based on a config entry."""
coordinator: SchlageDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
entities = []
for device_id in coordinator.data.locks:
for description in _DESCRIPTIONS:
entities.append(
SchlageBinarySensor(
coordinator=coordinator,
description=description,
device_id=device_id,
)
)
async_add_entities(entities)
async_add_entities(
SchlageBinarySensor(
coordinator=coordinator,
description=description,
device_id=device_id,
)
for device_id in coordinator.data.locks
for description in _DESCRIPTIONS
)
class SchlageBinarySensor(SchlageEntity, BinarySensorEntity):

View file

@ -62,17 +62,15 @@ async def async_setup_entry(
) -> None:
"""Set up switches based on a config entry."""
coordinator: SchlageDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
entities = []
for device_id in coordinator.data.locks:
for description in SWITCHES:
entities.append(
SchlageSwitch(
coordinator=coordinator,
description=description,
device_id=device_id,
)
)
async_add_entities(entities)
async_add_entities(
SchlageSwitch(
coordinator=coordinator,
description=description,
device_id=device_id,
)
for device_id in coordinator.data.locks
for description in SWITCHES
)
class SchlageSwitch(SchlageEntity, SwitchEntity):

View file

@ -175,32 +175,31 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up entry."""
entities: list[ScreenLogicBinarySensor] = []
coordinator: ScreenlogicDataUpdateCoordinator = hass.data[SL_DOMAIN][
config_entry.entry_id
]
gateway = coordinator.gateway
for core_sensor_description in SUPPORTED_CORE_SENSORS:
entities: list[ScreenLogicBinarySensor] = [
ScreenLogicPushBinarySensor(coordinator, core_sensor_description)
for core_sensor_description in SUPPORTED_CORE_SENSORS
if (
gateway.get_data(
*core_sensor_description.data_root, core_sensor_description.key
)
is not None
):
entities.append(
ScreenLogicPushBinarySensor(coordinator, core_sensor_description)
)
)
]
for p_index, p_data in gateway.get_data(DEVICE.PUMP).items():
if not p_data or not p_data.get(VALUE.DATA):
continue
for proto_pump_sensor_description in SUPPORTED_PUMP_SENSORS:
entities.append(
ScreenLogicPumpBinarySensor(
coordinator, copy(proto_pump_sensor_description), p_index
)
entities.extend(
ScreenLogicPumpBinarySensor(
coordinator, copy(proto_pump_sensor_description), p_index
)
for proto_pump_sensor_description in SUPPORTED_PUMP_SENSORS
)
chem_sensor_description: ScreenLogicPushBinarySensorDescription
for chem_sensor_description in SUPPORTED_INTELLICHEM_SENSORS:

View file

@ -47,26 +47,23 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up entry."""
entities = []
coordinator: ScreenlogicDataUpdateCoordinator = hass.data[SL_DOMAIN][
config_entry.entry_id
]
gateway = coordinator.gateway
for body_index in gateway.get_data(DEVICE.BODY):
entities.append(
ScreenLogicClimate(
coordinator,
ScreenLogicClimateDescription(
subscription_code=CODE.STATUS_CHANGED,
data_root=(DEVICE.BODY,),
key=body_index,
),
)
async_add_entities(
ScreenLogicClimate(
coordinator,
ScreenLogicClimateDescription(
subscription_code=CODE.STATUS_CHANGED,
data_root=(DEVICE.BODY,),
key=body_index,
),
)
async_add_entities(entities)
for body_index in gateway.get_data(DEVICE.BODY)
)
@dataclass(frozen=True, kw_only=True)

View file

@ -227,20 +227,21 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up entry."""
entities: list[ScreenLogicSensor] = []
coordinator: ScreenlogicDataUpdateCoordinator = hass.data[SL_DOMAIN][
config_entry.entry_id
]
gateway = coordinator.gateway
for core_sensor_description in SUPPORTED_CORE_SENSORS:
entities: list[ScreenLogicSensor] = [
ScreenLogicPushSensor(coordinator, core_sensor_description)
for core_sensor_description in SUPPORTED_CORE_SENSORS
if (
gateway.get_data(
*core_sensor_description.data_root, core_sensor_description.key
)
is not None
):
entities.append(ScreenLogicPushSensor(coordinator, core_sensor_description))
)
]
for pump_index, pump_data in gateway.get_data(DEVICE.PUMP).items():
if not pump_data or not pump_data.get(VALUE.DATA):

View file

@ -127,8 +127,10 @@ async def async_setup_entry(
)
)
for i in range(len(data.active_voltage)):
entities.append(SenseVoltageSensor(data, i, sense_monitor_id))
entities.extend(
SenseVoltageSensor(data, i, sense_monitor_id)
for i in range(len(data.active_voltage))
)
for type_id, typ in TRENDS_SENSOR_TYPES.items():
for variant_id, variant_name in TREND_SENSOR_VARIANTS:

View file

@ -230,11 +230,9 @@ class SensiboClimate(SensiboDeviceBaseEntity, ClimateEntity):
@property
def hvac_modes(self) -> list[HVACMode]:
"""Return the list of available hvac operation modes."""
hvac_modes = []
if TYPE_CHECKING:
assert self.device_data.hvac_modes
for mode in self.device_data.hvac_modes:
hvac_modes.append(SENSIBO_TO_HA[mode])
hvac_modes = [SENSIBO_TO_HA[mode] for mode in self.device_data.hvac_modes]
return hvac_modes if hvac_modes else [HVACMode.OFF]
@property

View file

@ -57,15 +57,12 @@ async def async_setup_platform(
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Seven segments OCR platform."""
entities = []
for camera in config[CONF_SOURCE]:
entities.append(
ImageProcessingSsocr(
hass, camera[CONF_ENTITY_ID], config, camera.get(CONF_NAME)
)
async_add_entities(
ImageProcessingSsocr(
hass, camera[CONF_ENTITY_ID], config, camera.get(CONF_NAME)
)
async_add_entities(entities)
for camera in config[CONF_SOURCE]
)
class ImageProcessingSsocr(ImageProcessingEntity):

View file

@ -188,8 +188,6 @@ def get_block_input_triggers(
if not is_block_momentary_input(device.settings, block, True):
return []
triggers = []
if block.type == "device" or get_number_of_channels(device, block) == 1:
subtype = "button"
else:
@ -203,20 +201,12 @@ def get_block_input_triggers(
else:
trigger_types = BASIC_INPUTS_EVENTS_TYPES
for trigger_type in trigger_types:
triggers.append((trigger_type, subtype))
return triggers
return [(trigger_type, subtype) for trigger_type in trigger_types]
def get_shbtn_input_triggers() -> list[tuple[str, str]]:
"""Return list of input triggers for SHBTN models."""
triggers = []
for trigger_type in SHBTN_INPUTS_EVENTS_TYPES:
triggers.append((trigger_type, "button"))
return triggers
return [(trigger_type, "button") for trigger_type in SHBTN_INPUTS_EVENTS_TYPES]
@singleton.singleton("shelly_coap")

View file

@ -52,10 +52,7 @@ def setup_platform(
auth = sigfox.auth
devices = sigfox.devices
sensors = []
for device in devices:
sensors.append(SigfoxDevice(device, auth, name))
add_entities(sensors, True)
add_entities((SigfoxDevice(device, auth, name) for device in devices), True)
def epoch_to_datetime(epoch_time):
@ -105,8 +102,7 @@ class SigfoxAPI:
url = urljoin(API_URL, location_url)
response = requests.get(url, auth=self._auth, timeout=10)
devices_data = json.loads(response.text)["data"]
for device in devices_data:
devices.append(device["id"])
devices.extend(device["id"] for device in devices_data)
return devices
@property

View file

@ -79,8 +79,10 @@ async def async_setup_entry(
if sensor.type in SUPPORTED_BATTERY_SENSOR_TYPES:
sensors.append(BatteryBinarySensor(simplisafe, system, sensor))
for lock in system.locks.values():
sensors.append(BatteryBinarySensor(simplisafe, system, lock))
sensors.extend(
BatteryBinarySensor(simplisafe, system, lock)
for lock in system.locks.values()
)
async_add_entities(sensors)

View file

@ -34,15 +34,16 @@ async def async_setup_entry(
) -> None:
"""Set up SimpliSafe locks based on a config entry."""
simplisafe = hass.data[DOMAIN][entry.entry_id]
locks = []
locks: list[SimpliSafeLock] = []
for system in simplisafe.systems.values():
if system.version == 2:
LOGGER.info("Skipping lock setup for V2 system: %s", system.system_id)
continue
for lock in system.locks.values():
locks.append(SimpliSafeLock(simplisafe, system, lock))
locks.extend(
SimpliSafeLock(simplisafe, system, lock) for lock in system.locks.values()
)
async_add_entities(locks)

View file

@ -25,16 +25,18 @@ async def async_setup_entry(
) -> None:
"""Set up SimpliSafe freeze sensors based on a config entry."""
simplisafe = hass.data[DOMAIN][entry.entry_id]
sensors = []
sensors: list[SimplisafeFreezeSensor] = []
for system in simplisafe.systems.values():
if system.version == 2:
LOGGER.info("Skipping sensor setup for V2 system: %s", system.system_id)
continue
for sensor in system.sensors.values():
if sensor.type == DeviceTypes.TEMPERATURE:
sensors.append(SimplisafeFreezeSensor(simplisafe, system, sensor))
sensors.extend(
SimplisafeFreezeSensor(simplisafe, system, sensor)
for sensor in system.sensors.values()
if sensor.type == DeviceTypes.TEMPERATURE
)
async_add_entities(sensors)

View file

@ -143,35 +143,35 @@ async def async_setup_entry(
"""Set up the SleepIQ bed sensors."""
data: SleepIQData = hass.data[DOMAIN][entry.entry_id]
entities = []
entities: list[SleepIQNumberEntity] = []
for bed in data.client.beds.values():
for sleeper in bed.sleepers:
entities.append(
SleepIQNumberEntity(
data.data_coordinator,
bed,
sleeper,
NUMBER_DESCRIPTIONS[FIRMNESS],
)
entities.extend(
SleepIQNumberEntity(
data.data_coordinator,
bed,
sleeper,
NUMBER_DESCRIPTIONS[FIRMNESS],
)
for actuator in bed.foundation.actuators:
entities.append(
SleepIQNumberEntity(
data.data_coordinator,
bed,
actuator,
NUMBER_DESCRIPTIONS[ACTUATOR],
)
for sleeper in bed.sleepers
)
entities.extend(
SleepIQNumberEntity(
data.data_coordinator,
bed,
actuator,
NUMBER_DESCRIPTIONS[ACTUATOR],
)
for foot_warmer in bed.foundation.foot_warmers:
entities.append(
SleepIQNumberEntity(
data.data_coordinator,
bed,
foot_warmer,
NUMBER_DESCRIPTIONS[FOOT_WARMING_TIMER],
)
for actuator in bed.foundation.actuators
)
entities.extend(
SleepIQNumberEntity(
data.data_coordinator,
bed,
foot_warmer,
NUMBER_DESCRIPTIONS[FOOT_WARMING_TIMER],
)
for foot_warmer in bed.foundation.foot_warmers
)
async_add_entities(entities)

View file

@ -844,19 +844,16 @@ async def async_setup_entry(
if TYPE_CHECKING:
assert config_entry.unique_id
entities = []
for sensor in used_sensors:
entities.append(
SMAsensor(
coordinator,
config_entry.unique_id,
SENSOR_ENTITIES.get(sensor.name),
device_info,
sensor,
)
async_add_entities(
SMAsensor(
coordinator,
config_entry.unique_id,
SENSOR_ENTITIES.get(sensor.name),
device_info,
sensor,
)
async_add_entities(entities)
for sensor in used_sensors
)
class SMAsensor(CoordinatorEntity, SensorEntity):

View file

@ -36,18 +36,18 @@ async def async_setup_entry(
)
)
elif actuator.type == "INFINITY_OUTPUT_MODULE":
for option in actuator.state_options:
entities.append(
SmappeeActuator(
smappee_base,
service_location,
actuator.name,
actuator_id,
actuator.type,
actuator.serialnumber,
actuator_state_option=option,
)
entities.extend(
SmappeeActuator(
smappee_base,
service_location,
actuator.name,
actuator_id,
actuator.type,
actuator.serialnumber,
actuator_state_option=option,
)
for option in actuator.state_options
)
async_add_entities(entities, True)

View file

@ -59,9 +59,9 @@ def get_capabilities(capabilities: Sequence[str]) -> Sequence[str] | None:
supported = [Capability.switch]
for capability in optional:
if capability in capabilities:
supported.append(capability)
supported.extend(
capability for capability in optional if capability in capabilities
)
return supported

View file

@ -85,15 +85,14 @@ async def async_setup_entry(
network_coordinator = sms_data[NETWORK_COORDINATOR]
gateway = sms_data[GATEWAY]
unique_id = str(await gateway.get_imei_async())
entities = []
for description in SIGNAL_SENSORS:
entities.append(
DeviceSensor(signal_coordinator, description, unique_id, gateway)
)
for description in NETWORK_SENSORS:
entities.append(
DeviceSensor(network_coordinator, description, unique_id, gateway)
)
entities = [
DeviceSensor(signal_coordinator, description, unique_id, gateway)
for description in SIGNAL_SENSORS
]
entities.extend(
DeviceSensor(network_coordinator, description, unique_id, gateway)
for description in NETWORK_SENSORS
)
async_add_entities(entities, True)

View file

@ -38,10 +38,9 @@ class SpeedTestDataCoordinator(DataUpdateCoordinator[dict[str, Any]]):
def update_servers(self) -> None:
"""Update list of test servers."""
test_servers = self.api.get_servers()
test_servers_list = []
for servers in test_servers.values():
for server in servers:
test_servers_list.append(server)
test_servers_list = [
server for servers in test_servers.values() for server in servers
]
for server in sorted(
test_servers_list,
key=lambda server: (

View file

@ -395,11 +395,11 @@ class SqueezeBoxEntity(MediaPlayerEntity):
player_ids = {
p.unique_id: p.entity_id for p in self.hass.data[DOMAIN][KNOWN_PLAYERS]
}
sync_group = []
for player in self._player.sync_group:
if player in player_ids:
sync_group.append(player_ids[player])
return sync_group
return [
player_ids[player]
for player in self._player.sync_group
if player in player_ids
]
@property
def sync_group(self):
@ -550,8 +550,7 @@ class SqueezeBoxEntity(MediaPlayerEntity):
"""
all_params = [command]
if parameters:
for parameter in parameters:
all_params.append(parameter)
all_params.extend(parameters)
await self._player.async_query(*all_params)
async def async_call_query(self, command, parameters=None):
@ -562,8 +561,7 @@ class SqueezeBoxEntity(MediaPlayerEntity):
"""
all_params = [command]
if parameters:
for parameter in parameters:
all_params.append(parameter)
all_params.extend(parameters)
self._query_result = await self._player.async_query(*all_params)
_LOGGER.debug("call_query got result %s", self._query_result)

View file

@ -24,12 +24,12 @@ async def async_setup_entry(
) -> None:
"""Set up the StarLine button."""
account: StarlineAccount = hass.data[DOMAIN][entry.entry_id]
entities = []
for device in account.api.devices.values():
if device.support_state:
for description in BUTTON_TYPES:
entities.append(StarlineButton(account, device, description))
async_add_entities(entities)
async_add_entities(
StarlineButton(account, device, description)
for device in account.api.devices.values()
if device.support_state
for description in BUTTON_TYPES
)
class StarlineButton(StarlineEntity, ButtonEntity):

View file

@ -16,11 +16,11 @@ async def async_setup_entry(
) -> None:
"""Set up StarLine entry."""
account: StarlineAccount = hass.data[DOMAIN][entry.entry_id]
entities = []
for device in account.api.devices.values():
if device.support_position:
entities.append(StarlineDeviceTracker(account, device))
async_add_entities(entities)
async_add_entities(
StarlineDeviceTracker(account, device)
for device in account.api.devices.values()
if device.support_position
)
class StarlineDeviceTracker(StarlineEntity, TrackerEntity, RestoreEntity):

View file

@ -54,18 +54,18 @@ def setup_platform(
) -> None:
"""Set up the Sterling Bank sensor platform."""
sensors = []
sensors: list[StarlingBalanceSensor] = []
for account in config[CONF_ACCOUNTS]:
try:
starling_account = StarlingAccount(
account[CONF_ACCESS_TOKEN], sandbox=account[CONF_SANDBOX]
)
for balance_type in account[CONF_BALANCE_TYPES]:
sensors.append(
StarlingBalanceSensor(
starling_account, account[CONF_NAME], balance_type
)
sensors.extend(
StarlingBalanceSensor(
starling_account, account[CONF_NAME], balance_type
)
for balance_type in account[CONF_BALANCE_TYPES]
)
except requests.exceptions.HTTPError as error:
_LOGGER.error(
"Unable to set up Starling account '%s': %s", account[CONF_NAME], error

View file

@ -36,11 +36,11 @@ async def async_setup_entry(
entry: dict = hass.data[DOMAIN][config_entry.entry_id]
coordinator: DataUpdateCoordinator = entry[ENTRY_COORDINATOR]
vehicle_info: dict = entry[ENTRY_VEHICLES]
entities: list[SubaruDeviceTracker] = []
for vehicle in vehicle_info.values():
if vehicle[VEHICLE_HAS_REMOTE_SERVICE]:
entities.append(SubaruDeviceTracker(vehicle, coordinator))
async_add_entities(entities)
async_add_entities(
SubaruDeviceTracker(vehicle, coordinator)
for vehicle in vehicle_info.values()
if vehicle[VEHICLE_HAS_REMOTE_SERVICE]
)
class SubaruDeviceTracker(

View file

@ -23,25 +23,18 @@ async def async_setup_entry(
) -> None:
"""Set up Sure PetCare locks on a config entry."""
entities: list[SurePetcareLock] = []
coordinator: SurePetcareDataCoordinator = hass.data[DOMAIN][entry.entry_id]
for surepy_entity in coordinator.data.values():
if surepy_entity.type not in [
EntityType.CAT_FLAP,
EntityType.PET_FLAP,
]:
continue
async_add_entities(
SurePetcareLock(surepy_entity.id, coordinator, lock_state)
for surepy_entity in coordinator.data.values()
if surepy_entity.type in [EntityType.CAT_FLAP, EntityType.PET_FLAP]
for lock_state in (
LockState.LOCKED_IN,
LockState.LOCKED_OUT,
LockState.LOCKED_ALL,
):
entities.append(SurePetcareLock(surepy_entity.id, coordinator, lock_state))
async_add_entities(entities)
)
)
class SurePetcareLock(SurePetcareEntity, LockEntity):

View file

@ -73,12 +73,13 @@ def setup_platform(
_LOGGER.error("The station doesn't exists: %s", station)
return
entities = []
for condition in monitored_conditions:
entities.append(SwissHydrologicalDataSensor(hydro_data, station, condition))
add_entities(entities, True)
add_entities(
(
SwissHydrologicalDataSensor(hydro_data, station, condition)
for condition in monitored_conditions
),
True,
)
class SwissHydrologicalDataSensor(SensorEntity):

View file

@ -62,15 +62,15 @@ async def async_setup_entry(
SyncThruMainSensor(coordinator, name),
SyncThruActiveAlertSensor(coordinator, name),
]
for key in supp_toner:
entities.append(SyncThruTonerSensor(coordinator, name, key))
for key in supp_drum:
entities.append(SyncThruDrumSensor(coordinator, name, key))
for key in supp_tray:
entities.append(SyncThruInputTraySensor(coordinator, name, key))
for int_key in supp_output_tray:
entities.append(SyncThruOutputTraySensor(coordinator, name, int_key))
entities.extend(SyncThruTonerSensor(coordinator, name, key) for key in supp_toner)
entities.extend(SyncThruDrumSensor(coordinator, name, key) for key in supp_drum)
entities.extend(
SyncThruInputTraySensor(coordinator, name, key) for key in supp_tray
)
entities.extend(
SyncThruOutputTraySensor(coordinator, name, int_key)
for int_key in supp_output_tray
)
async_add_entities(entities)

View file

@ -91,20 +91,18 @@ class SynologyPhotosMediaSource(MediaSource):
) -> list[BrowseMediaSource]:
"""Handle browsing different diskstations."""
if not item.identifier:
ret = []
for entry in self.entries:
ret.append(
BrowseMediaSource(
domain=DOMAIN,
identifier=entry.unique_id,
media_class=MediaClass.DIRECTORY,
media_content_type=MediaClass.IMAGE,
title=f"{entry.title} - {entry.unique_id}",
can_play=False,
can_expand=True,
)
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=entry.unique_id,
media_class=MediaClass.DIRECTORY,
media_content_type=MediaClass.IMAGE,
title=f"{entry.title} - {entry.unique_id}",
can_play=False,
can_expand=True,
)
return ret
for entry in self.entries
]
identifier = SynologyPhotosMediaSourceIdentifier(item.identifier)
diskstation: SynologyDSMData = self.hass.data[DOMAIN][identifier.unique_id]

View file

@ -50,23 +50,20 @@ async def async_setup_entry(
"""Set up System Bridge binary sensor based on a config entry."""
coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
entities = []
for description in BASE_BINARY_SENSOR_TYPES:
entities.append(
SystemBridgeBinarySensor(coordinator, description, entry.data[CONF_PORT])
)
entities = [
SystemBridgeBinarySensor(coordinator, description, entry.data[CONF_PORT])
for description in BASE_BINARY_SENSOR_TYPES
]
if (
coordinator.data.battery
and coordinator.data.battery.percentage
and coordinator.data.battery.percentage > -1
):
for description in BATTERY_BINARY_SENSOR_TYPES:
entities.append(
SystemBridgeBinarySensor(
coordinator, description, entry.data[CONF_PORT]
)
)
entities.extend(
SystemBridgeBinarySensor(coordinator, description, entry.data[CONF_PORT])
for description in BATTERY_BINARY_SENSOR_TYPES
)
async_add_entities(entities)

View file

@ -88,22 +88,21 @@ class SystemBridgeSource(MediaSource):
def _build_bridges(self) -> BrowseMediaSource:
"""Build bridges for System Bridge media."""
children = []
for entry in self.hass.config_entries.async_entries(DOMAIN):
if entry.entry_id is not None:
children.append(
BrowseMediaSource(
domain=DOMAIN,
identifier=entry.entry_id,
media_class=MediaClass.DIRECTORY,
media_content_type="",
title=entry.title,
can_play=False,
can_expand=True,
children=[],
children_media_class=MediaClass.DIRECTORY,
)
)
children = [
BrowseMediaSource(
domain=DOMAIN,
identifier=entry.entry_id,
media_class=MediaClass.DIRECTORY,
media_content_type="",
title=entry.title,
can_play=False,
can_expand=True,
children=[],
children_media_class=MediaClass.DIRECTORY,
)
for entry in self.hass.config_entries.async_entries(DOMAIN)
if entry.entry_id is not None
]
return BrowseMediaSource(
domain=DOMAIN,

View file

@ -364,45 +364,44 @@ async def async_setup_entry(
"""Set up System Bridge sensor based on a config entry."""
coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
entities = []
for description in BASE_SENSOR_TYPES:
entities.append(
SystemBridgeSensor(coordinator, description, entry.data[CONF_PORT])
)
entities = [
SystemBridgeSensor(coordinator, description, entry.data[CONF_PORT])
for description in BASE_SENSOR_TYPES
]
for index_device, device in enumerate(coordinator.data.disks.devices):
if device.partitions is None:
continue
for index_partition, partition in enumerate(device.partitions):
entities.append(
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"filesystem_{partition.mount_point.replace(':', '')}",
name=f"{partition.mount_point} space used",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:harddisk",
value=(
lambda data,
dk=index_device,
pk=index_partition: partition_usage(data, dk, pk)
),
entities.extend(
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"filesystem_{partition.mount_point.replace(':', '')}",
name=f"{partition.mount_point} space used",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:harddisk",
value=(
lambda data,
dk=index_device,
pk=index_partition: partition_usage(data, dk, pk)
),
entry.data[CONF_PORT],
)
),
entry.data[CONF_PORT],
)
for index_partition, partition in enumerate(device.partitions)
)
if (
coordinator.data.battery
and coordinator.data.battery.percentage
and coordinator.data.battery.percentage > -1
):
for description in BATTERY_SENSOR_TYPES:
entities.append(
SystemBridgeSensor(coordinator, description, entry.data[CONF_PORT])
)
entities.extend(
SystemBridgeSensor(coordinator, description, entry.data[CONF_PORT])
for description in BATTERY_SENSOR_TYPES
)
entities.append(
SystemBridgeSensor(
@ -466,127 +465,128 @@ async def async_setup_entry(
]
for index, gpu in enumerate(coordinator.data.gpus):
entities = [
*entities,
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_core_clock_speed",
name=f"{gpu.name} clock speed",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfFrequency.MEGAHERTZ,
device_class=SensorDeviceClass.FREQUENCY,
icon="mdi:speedometer",
value=lambda data, k=index: gpu_core_clock_speed(data, k),
entities.extend(
[
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_core_clock_speed",
name=f"{gpu.name} clock speed",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfFrequency.MEGAHERTZ,
device_class=SensorDeviceClass.FREQUENCY,
icon="mdi:speedometer",
value=lambda data, k=index: gpu_core_clock_speed(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_memory_clock_speed",
name=f"{gpu.name} memory clock speed",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfFrequency.MEGAHERTZ,
device_class=SensorDeviceClass.FREQUENCY,
icon="mdi:speedometer",
value=lambda data, k=index: gpu_memory_clock_speed(data, k),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_memory_clock_speed",
name=f"{gpu.name} memory clock speed",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfFrequency.MEGAHERTZ,
device_class=SensorDeviceClass.FREQUENCY,
icon="mdi:speedometer",
value=lambda data, k=index: gpu_memory_clock_speed(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_memory_free",
name=f"{gpu.name} memory free",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.MEGABYTES,
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:memory",
value=lambda data, k=index: gpu_memory_free(data, k),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_memory_free",
name=f"{gpu.name} memory free",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.MEGABYTES,
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:memory",
value=lambda data, k=index: gpu_memory_free(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_memory_used_percentage",
name=f"{gpu.name} memory used %",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:memory",
value=lambda data, k=index: gpu_memory_used_percentage(data, k),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_memory_used_percentage",
name=f"{gpu.name} memory used %",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:memory",
value=lambda data, k=index: gpu_memory_used_percentage(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_memory_used",
name=f"{gpu.name} memory used",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.MEGABYTES,
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:memory",
value=lambda data, k=index: gpu_memory_used(data, k),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_memory_used",
name=f"{gpu.name} memory used",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.MEGABYTES,
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:memory",
value=lambda data, k=index: gpu_memory_used(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_fan_speed",
name=f"{gpu.name} fan speed",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=REVOLUTIONS_PER_MINUTE,
icon="mdi:fan",
value=lambda data, k=index: gpu_fan_speed(data, k),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_fan_speed",
name=f"{gpu.name} fan speed",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=REVOLUTIONS_PER_MINUTE,
icon="mdi:fan",
value=lambda data, k=index: gpu_fan_speed(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_power_usage",
name=f"{gpu.name} power usage",
entity_registry_enabled_default=False,
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfPower.WATT,
value=lambda data, k=index: gpu_power_usage(data, k),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_power_usage",
name=f"{gpu.name} power usage",
entity_registry_enabled_default=False,
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfPower.WATT,
value=lambda data, k=index: gpu_power_usage(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_temperature",
name=f"{gpu.name} temperature",
entity_registry_enabled_default=False,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfTemperature.CELSIUS,
value=lambda data, k=index: gpu_temperature(data, k),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_temperature",
name=f"{gpu.name} temperature",
entity_registry_enabled_default=False,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfTemperature.CELSIUS,
value=lambda data, k=index: gpu_temperature(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_usage_percentage",
name=f"{gpu.name} usage %",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:percent",
value=lambda data, k=index: gpu_usage_percentage(data, k),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{gpu.id}_usage_percentage",
name=f"{gpu.name} usage %",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:percent",
value=lambda data, k=index: gpu_usage_percentage(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
),
]
]
)
if coordinator.data.cpu.per_cpu is not None:
for cpu in coordinator.data.cpu.per_cpu:

View file

@ -92,21 +92,20 @@ async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up System Montor binary sensors based on a config entry."""
entities: list[SystemMonitorSensor] = []
coordinator: SystemMonitorCoordinator = hass.data[DOMAIN_COORDINATOR]
for sensor_description in SENSOR_TYPES:
_entry = entry.options.get(BINARY_SENSOR_DOMAIN, {})
for argument in _entry.get(CONF_PROCESS, []):
entities.append(
SystemMonitorSensor(
coordinator,
sensor_description,
entry.entry_id,
argument,
)
)
async_add_entities(entities)
async_add_entities(
SystemMonitorSensor(
coordinator,
sensor_description,
entry.entry_id,
argument,
)
for sensor_description in SENSOR_TYPES
for argument in entry.options.get(BINARY_SENSOR_DOMAIN, {}).get(
CONF_PROCESS, []
)
)
class SystemMonitorSensor(

View file

@ -658,9 +658,7 @@ def test_get_significant_states_only(
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=4)
points = []
for i in range(1, 4):
points.append(start + timedelta(minutes=i))
points = [start + timedelta(minutes=i) for i in range(1, 4)]
states = []
with freeze_time(start) as freezer:

View file

@ -517,9 +517,7 @@ def test_get_significant_states_only(
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=4)
points = []
for i in range(1, 4):
points.append(start + timedelta(minutes=i))
points = [start + timedelta(minutes=i) for i in range(1, 4)]
states = []
with freeze_time(start) as freezer:

View file

@ -507,9 +507,7 @@ def test_get_significant_states_only(
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=4)
points = []
for i in range(1, 4):
points.append(start + timedelta(minutes=i))
points = [start + timedelta(minutes=i) for i in range(1, 4)]
states = []
with freeze_time(start) as freezer:

View file

@ -914,18 +914,17 @@ async def test_stats_timestamp_conversion_is_reentrant(
def _get_all_short_term_stats() -> list[dict[str, Any]]:
with session_scope(hass=hass) as session:
results = []
for result in (
session.query(old_db_schema.StatisticsShortTerm)
.where(old_db_schema.StatisticsShortTerm.metadata_id == 1000)
.all()
):
results.append(
{
field.name: getattr(result, field.name)
for field in old_db_schema.StatisticsShortTerm.__table__.c
}
results = [
{
field.name: getattr(result, field.name)
for field in old_db_schema.StatisticsShortTerm.__table__.c
}
for result in (
session.query(old_db_schema.StatisticsShortTerm)
.where(old_db_schema.StatisticsShortTerm.metadata_id == 1000)
.all()
)
]
return sorted(results, key=lambda row: row["start_ts"])
# Do not optimize this block, its intentionally written to interleave
@ -1099,14 +1098,12 @@ async def test_stats_timestamp_with_one_by_one(
def _get_all_stats(table: old_db_schema.StatisticsBase) -> list[dict[str, Any]]:
"""Get all stats from a table."""
with session_scope(hass=hass) as session:
results = []
for result in session.query(table).where(table.metadata_id == 1000).all():
results.append(
{
field.name: getattr(result, field.name)
for field in table.__table__.c
}
)
results = [
{field.name: getattr(result, field.name) for field in table.__table__.c}
for result in session.query(table)
.where(table.metadata_id == 1000)
.all()
]
return sorted(results, key=lambda row: row["start_ts"])
def _insert_and_do_migration():
@ -1326,14 +1323,12 @@ async def test_stats_timestamp_with_one_by_one_removes_duplicates(
def _get_all_stats(table: old_db_schema.StatisticsBase) -> list[dict[str, Any]]:
"""Get all stats from a table."""
with session_scope(hass=hass) as session:
results = []
for result in session.query(table).where(table.metadata_id == 1000).all():
results.append(
{
field.name: getattr(result, field.name)
for field in table.__table__.c
}
)
results = [
{field.name: getattr(result, field.name) for field in table.__table__.c}
for result in session.query(table)
.where(table.metadata_id == 1000)
.all()
]
return sorted(results, key=lambda row: row["start_ts"])
def _insert_and_do_migration():

View file

@ -474,19 +474,18 @@ async def test_dhcp_ip_update(
const.DOMAIN, context={"source": config_entries.SOURCE_DHCP}, data=dhcp_data
)
expected_calls = []
for host in host_call_list:
expected_calls.append(
call(
host,
TEST_USERNAME,
TEST_PASSWORD,
port=TEST_PORT,
use_https=TEST_USE_HTTPS,
protocol=DEFAULT_PROTOCOL,
timeout=DEFAULT_TIMEOUT,
)
expected_calls = [
call(
host,
TEST_USERNAME,
TEST_PASSWORD,
port=TEST_PORT,
use_https=TEST_USE_HTTPS,
protocol=DEFAULT_PROTOCOL,
timeout=DEFAULT_TIMEOUT,
)
for host in host_call_list
]
assert reolink_connect_class.call_args_list == expected_calls

View file

@ -1292,17 +1292,16 @@ async def test_state_characteristics(hass: HomeAssistant) -> None:
"unit": "%",
},
)
sensors_config = []
for characteristic in characteristics:
sensors_config.append(
{
"platform": "statistics",
"name": f"test_{characteristic['source_sensor_domain']}_{characteristic['name']}",
"entity_id": f"{characteristic['source_sensor_domain']}.test_monitored",
"state_characteristic": characteristic["name"],
"max_age": {"minutes": 8}, # 9 values spaces by one minute
}
)
sensors_config = [
{
"platform": "statistics",
"name": f"test_{characteristic['source_sensor_domain']}_{characteristic['name']}",
"entity_id": f"{characteristic['source_sensor_domain']}.test_monitored",
"state_characteristic": characteristic["name"],
"max_age": {"minutes": 8}, # 9 values spaces by one minute
}
for characteristic in characteristics
]
with freeze_time(current_time) as freezer:
assert await async_setup_component(

View file

@ -69,10 +69,10 @@ class MockedInterface(dict):
def GetFriendList(self, steamid: str) -> dict:
"""Get friend list."""
fake_friends = [{"steamid": ACCOUNT_2}]
for _i in range(0, 4):
fake_friends.append(
{"steamid": "".join(random.choices(string.digits, k=len(ACCOUNT_1)))}
)
fake_friends.extend(
{"steamid": "".join(random.choices(string.digits, k=len(ACCOUNT_1)))}
for _ in range(0, 4)
)
return {"friendslist": {"friends": fake_friends}}
def GetPlayerSummaries(self, steamids: str | list[str]) -> dict:

View file

@ -399,9 +399,7 @@ async def test_hls_max_segments(
# Only NUM_PLAYLIST_SEGMENTS are returned in the playlist.
start = MAX_SEGMENTS + 1 - NUM_PLAYLIST_SEGMENTS
segments = []
for sequence in range(start, MAX_SEGMENTS + 1):
segments.append(make_segment(sequence))
segments = [make_segment(sequence) for sequence in range(start, MAX_SEGMENTS + 1)]
assert await resp.text() == make_playlist(sequence=start, segments=segments)
# Fetch the actual segments with a fake byte payload
@ -497,9 +495,7 @@ async def test_hls_max_segments_discontinuity(
# EXT-X-DISCONTINUITY tag to be omitted and EXT-X-DISCONTINUITY-SEQUENCE
# returned instead.
start = MAX_SEGMENTS + 1 - NUM_PLAYLIST_SEGMENTS
segments = []
for sequence in range(start, MAX_SEGMENTS + 1):
segments.append(make_segment(sequence))
segments = [make_segment(sequence) for sequence in range(start, MAX_SEGMENTS + 1)]
assert await resp.text() == make_playlist(
sequence=start,
discontinuity_sequence=1,

View file

@ -96,10 +96,10 @@ def make_segment_with_parts(
response = []
if discontinuity:
response.append("#EXT-X-DISCONTINUITY")
for i in range(num_parts):
response.append(
f'#EXT-X-PART:DURATION={TEST_PART_DURATION:.3f},URI="./segment/{segment}.{i}.m4s"{",INDEPENDENT=YES" if i%independent_period==0 else ""}'
)
response.extend(
f'#EXT-X-PART:DURATION={TEST_PART_DURATION:.3f},URI="./segment/{segment}.{i}.m4s"{",INDEPENDENT=YES" if i%independent_period==0 else ""}'
for i in range(num_parts)
)
response.extend(
[
"#EXT-X-PROGRAM-DATE-TIME:"