Use assignment expressions 10 (#57791)

This commit is contained in:
Marc Mueller 2021-10-15 21:36:03 +02:00 committed by GitHub
parent 9be3278ffa
commit 12d1dfdaf9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 44 additions and 87 deletions

View file

@ -159,8 +159,7 @@ async def _async_get_device_automations(
for device_id in match_device_ids:
combined_results[device_id] = []
device = device_registry.async_get(device_id)
if device is None:
if (device := device_registry.async_get(device_id)) is None:
raise DeviceNotFound
for entry_id in device.config_entries:
if config_entry := hass.config_entries.async_get_entry(entry_id):
@ -221,8 +220,7 @@ async def _async_get_device_automation_capabilities(hass, automation_type, autom
capabilities = capabilities.copy()
extra_fields = capabilities.get("extra_fields")
if extra_fields is None:
if (extra_fields := capabilities.get("extra_fields")) is None:
capabilities["extra_fields"] = []
else:
capabilities["extra_fields"] = voluptuous_serialize.convert(

View file

@ -129,8 +129,7 @@ async def async_call_action_from_config(
@callback
def async_condition_from_config(config: ConfigType) -> condition.ConditionCheckerType:
"""Evaluate state based on configuration."""
condition_type = config[CONF_TYPE]
if condition_type == CONF_IS_ON:
if config[CONF_TYPE] == CONF_IS_ON:
stat = "on"
else:
stat = "off"
@ -152,8 +151,7 @@ async def async_attach_trigger(
automation_info: AutomationTriggerInfo,
) -> CALLBACK_TYPE:
"""Listen for state changes based on configuration."""
trigger_type = config[CONF_TYPE]
if trigger_type == CONF_TURNED_ON:
if config[CONF_TYPE] == CONF_TURNED_ON:
to_state = "on"
else:
to_state = "off"

View file

@ -294,9 +294,7 @@ class HueOneLightStateView(HomeAssistantView):
)
return self.json_message("Entity not found", HTTPStatus.NOT_FOUND)
entity = hass.states.get(hass_entity_id)
if entity is None:
if (entity := hass.states.get(hass_entity_id)) is None:
_LOGGER.error("Entity not found: %s", hass_entity_id)
return self.json_message("Entity not found", HTTPStatus.NOT_FOUND)
@ -333,9 +331,7 @@ class HueOneLightChangeView(HomeAssistantView):
_LOGGER.error("Unknown entity number: %s", entity_number)
return self.json_message("Entity not found", HTTPStatus.NOT_FOUND)
entity = hass.states.get(entity_id)
if entity is None:
if (entity := hass.states.get(entity_id)) is None:
_LOGGER.error("Entity not found: %s", entity_id)
return self.json_message("Entity not found", HTTPStatus.NOT_FOUND)
@ -544,8 +540,7 @@ class HueOneLightChangeView(HomeAssistantView):
):
domain = entity.domain
# Convert 0-100 to a fan speed
brightness = parsed[STATE_BRIGHTNESS]
if brightness == 0:
if (brightness := parsed[STATE_BRIGHTNESS]) == 0:
data[ATTR_SPEED] = SPEED_OFF
elif 0 < brightness <= 33.3:
data[ATTR_SPEED] = SPEED_LOW

View file

@ -50,9 +50,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce a single state."""
cur_state = hass.states.get(state.entity_id)
if cur_state is None:
if (cur_state := hass.states.get(state.entity_id)) is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id)
return

View file

@ -53,8 +53,7 @@ async def _get_entity_and_device(
hass.helpers.entity_registry.async_get_registry(),
)
entity_entry = ent_reg.async_get(entity_id)
if not entity_entry:
if not (entity_entry := ent_reg.async_get(entity_id)):
return None, None
device_entry = dev_reg.devices.get(entity_entry.device_id)
return entity_entry, device_entry
@ -500,8 +499,7 @@ class GoogleEntity:
}
# use aliases
aliases = entity_config.get(CONF_ALIASES)
if aliases:
if aliases := entity_config.get(CONF_ALIASES):
device["name"]["nicknames"] = [name] + aliases
if self.config.is_local_sdk_active and self.should_expose_local():
@ -518,8 +516,7 @@ class GoogleEntity:
for trt in traits:
device["attributes"].update(trt.sync_attributes())
room = entity_config.get(CONF_ROOM_HINT)
if room:
if room := entity_config.get(CONF_ROOM_HINT):
device["roomHint"] = room
else:
area = await _get_area(self.hass, entity_entry, device_entry)

View file

@ -45,9 +45,7 @@ async def _process(hass, data, message):
"payload": {"errorCode": ERR_PROTOCOL_ERROR},
}
handler = HANDLERS.get(inputs[0].get("intent"))
if handler is None:
if (handler := HANDLERS.get(inputs[0].get("intent"))) is None:
return {
"requestId": data.request_id,
"payload": {"errorCode": ERR_PROTOCOL_ERROR},
@ -131,9 +129,8 @@ async def async_devices_query(hass, data, payload):
devices = {}
for device in payload_devices:
devid = device["id"]
state = hass.states.get(devid)
if not state:
if not (state := hass.states.get(devid)):
# If we can't find a state, the device is offline
devices[devid] = {"online": False}
continue
@ -199,9 +196,7 @@ async def handle_devices_execute(hass, data, payload):
executions[entity_id].append(execution)
continue
state = hass.states.get(entity_id)
if state is None:
if (state := hass.states.get(entity_id)) is None:
results[entity_id] = {
"ids": [entity_id],
"status": "ERROR",

View file

@ -954,16 +954,14 @@ class TemperatureSettingTrait(_Trait):
1,
)
else:
target_temp = attrs.get(ATTR_TEMPERATURE)
if target_temp is not None:
if (target_temp := attrs.get(ATTR_TEMPERATURE)) is not None:
target_temp = round(
temp_util.convert(target_temp, unit, TEMP_CELSIUS), 1
)
response["thermostatTemperatureSetpointHigh"] = target_temp
response["thermostatTemperatureSetpointLow"] = target_temp
else:
target_temp = attrs.get(ATTR_TEMPERATURE)
if target_temp is not None:
if (target_temp := attrs.get(ATTR_TEMPERATURE)) is not None:
response["thermostatTemperatureSetpoint"] = round(
temp_util.convert(target_temp, unit, TEMP_CELSIUS), 1
)
@ -1306,11 +1304,9 @@ class ArmDisArmTrait(_Trait):
async def execute(self, command, data, params, challenge):
"""Execute an ArmDisarm command."""
if params["arm"] and not params.get("cancel"):
arm_level = params.get("armLevel")
# If no arm level given, we can only arm it if there is
# only one supported arm type. We never default to triggered.
if not arm_level:
if not (arm_level := params.get("armLevel")):
states = self._supported_states()
if STATE_ALARM_TRIGGERED in states:
@ -1554,9 +1550,7 @@ class ModesTrait(_Trait):
if self.state.domain != domain:
continue
items = self.state.attributes.get(attr)
if items is not None:
if (items := self.state.attributes.get(attr)) is not None:
modes.append(self._generate(name, items))
# Shortcut since all domains are currently unique
@ -1668,19 +1662,19 @@ class ModesTrait(_Trait):
)
return
if self.state.domain == media_player.DOMAIN:
sound_mode = settings.get("sound mode")
if sound_mode:
await self.hass.services.async_call(
media_player.DOMAIN,
media_player.SERVICE_SELECT_SOUND_MODE,
{
ATTR_ENTITY_ID: self.state.entity_id,
media_player.ATTR_SOUND_MODE: sound_mode,
},
blocking=True,
context=data.context,
)
if self.state.domain == media_player.DOMAIN and (
sound_mode := settings.get("sound mode")
):
await self.hass.services.async_call(
media_player.DOMAIN,
media_player.SERVICE_SELECT_SOUND_MODE,
{
ATTR_ENTITY_ID: self.state.entity_id,
media_player.ATTR_SOUND_MODE: sound_mode,
},
blocking=True,
context=data.context,
)
_LOGGER.info(
"Received an Options command for unrecognised domain %s",
@ -2042,9 +2036,7 @@ def _verify_pin_challenge(data, state, challenge):
if not challenge:
raise ChallengeNeeded(CHALLENGE_PIN_NEEDED)
pin = challenge.get("pin")
if pin != data.config.secure_devices_pin:
if challenge.get("pin") != data.config.secure_devices_pin:
raise ChallengeNeeded(CHALLENGE_FAILED_PIN_NEEDED)
@ -2320,8 +2312,7 @@ class SensorStateTrait(_Trait):
def sync_attributes(self):
"""Return attributes for a sync request."""
device_class = self.state.attributes.get(ATTR_DEVICE_CLASS)
data = self.sensor_types.get(device_class)
if data is not None:
if (data := self.sensor_types.get(device_class)) is not None:
return {
"sensorStatesSupported": {
"name": data[0],
@ -2332,8 +2323,7 @@ class SensorStateTrait(_Trait):
def query_attributes(self):
"""Return the attributes of this trait for this entity."""
device_class = self.state.attributes.get(ATTR_DEVICE_CLASS)
data = self.sensor_types.get(device_class)
if data is not None:
if (data := self.sensor_types.get(device_class)) is not None:
return {
"currentSensorStateData": [
{"name": data[0], "rawValue": self.state.state}

View file

@ -121,9 +121,7 @@ def is_on(hass, entity_id):
# Integration not setup yet, it cannot be on
return False
state = hass.states.get(entity_id)
if state is not None:
if (state := hass.states.get(entity_id)) is not None:
return state.state in hass.data[REG_KEY].on_off_mapping
return False
@ -213,9 +211,7 @@ def groups_with_entity(hass: HomeAssistant, entity_id: str) -> list[str]:
async def async_setup(hass, config):
"""Set up all groups found defined in the configuration."""
component = hass.data.get(DOMAIN)
if component is None:
if (component := hass.data.get(DOMAIN)) is None:
component = hass.data[DOMAIN] = EntityComponent(_LOGGER, DOMAIN, hass)
hass.data[REG_KEY] = GroupIntegrationRegistry()
@ -507,9 +503,7 @@ class Group(Entity):
)
# If called before the platform async_setup is called (test cases)
component = hass.data.get(DOMAIN)
if component is None:
if (component := hass.data.get(DOMAIN)) is None:
component = hass.data[DOMAIN] = EntityComponent(_LOGGER, DOMAIN, hass)
await component.async_add_entities([group])
@ -661,9 +655,8 @@ class Group(Entity):
return
self.async_set_context(event.context)
new_state = event.data.get("new_state")
if new_state is None:
if (new_state := event.data.get("new_state")) is None:
# The state was removed from the state machine
self._reset_tracked_state()
@ -677,9 +670,7 @@ class Group(Entity):
self._on_states = set()
for entity_id in self.trackable:
state = self.hass.states.get(entity_id)
if state is not None:
if (state := self.hass.states.get(entity_id)) is not None:
self._see_state(state)
def _see_state(self, new_state):

View file

@ -112,8 +112,7 @@ class CoverGroup(GroupEntity, CoverEntity):
async def _update_supported_features_event(self, event: Event) -> None:
self.async_set_context(event.context)
entity = event.data.get("entity_id")
if entity is not None:
if (entity := event.data.get("entity_id")) is not None:
await self.async_update_supported_features(
entity, event.data.get("new_state")
)
@ -168,8 +167,7 @@ class CoverGroup(GroupEntity, CoverEntity):
async def async_added_to_hass(self) -> None:
"""Register listeners."""
for entity_id in self._entities:
new_state = self.hass.states.get(entity_id)
if new_state is None:
if (new_state := self.hass.states.get(entity_id)) is None:
continue
await self.async_update_supported_features(
entity_id, new_state, update_state=False
@ -264,8 +262,7 @@ class CoverGroup(GroupEntity, CoverEntity):
self._attr_is_opening = False
has_valid_state = False
for entity_id in self._entities:
state = self.hass.states.get(entity_id)
if not state:
if not (state := self.hass.states.get(entity_id)):
continue
if state.state == STATE_OPEN:
self._attr_is_closed = False
@ -322,8 +319,7 @@ class CoverGroup(GroupEntity, CoverEntity):
if not self._attr_assumed_state:
for entity_id in self._entities:
state = self.hass.states.get(entity_id)
if state is None:
if (state := self.hass.states.get(entity_id)) is None:
continue
if state and state.attributes.get(ATTR_ASSUMED_STATE):
self._attr_assumed_state = True

View file

@ -11,8 +11,7 @@ from homeassistant.core import State
def find_state_attributes(states: list[State], key: str) -> Iterator[Any]:
"""Find attributes with matching key from states."""
for state in states:
value = state.attributes.get(key)
if value is not None:
if (value := state.attributes.get(key)) is not None:
yield value