Use shorthand attrs for MQTT cover (#100710)
* User shorthand attrs for MQTT cover * Cleanup constructor * Cleanup constructor
This commit is contained in:
parent
4133162053
commit
4c65c92fb0
1 changed files with 38 additions and 80 deletions
|
@ -235,26 +235,12 @@ async def _async_setup_entity(
|
|||
class MqttCover(MqttEntity, CoverEntity):
|
||||
"""Representation of a cover that can be controlled using MQTT."""
|
||||
|
||||
_attr_is_closed: bool | None = None
|
||||
_attributes_extra_blocked: frozenset[str] = MQTT_COVER_ATTRIBUTES_BLOCKED
|
||||
_default_name = DEFAULT_NAME
|
||||
_entity_id_format: str = cover.ENTITY_ID_FORMAT
|
||||
_attributes_extra_blocked: frozenset[str] = MQTT_COVER_ATTRIBUTES_BLOCKED
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
config_entry: ConfigEntry,
|
||||
discovery_data: DiscoveryInfoType | None,
|
||||
) -> None:
|
||||
"""Initialize the cover."""
|
||||
self._position: int | None = None
|
||||
self._state: str | None = None
|
||||
|
||||
self._optimistic: bool | None = None
|
||||
self._tilt_value: int | None = None
|
||||
self._tilt_optimistic: bool | None = None
|
||||
|
||||
MqttEntity.__init__(self, hass, config, config_entry, discovery_data)
|
||||
_optimistic: bool
|
||||
_tilt_optimistic: bool
|
||||
|
||||
@staticmethod
|
||||
def config_schema() -> vol.Schema:
|
||||
|
@ -287,21 +273,17 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
and config.get(CONF_TILT_STATUS_TOPIC) is None
|
||||
)
|
||||
|
||||
if config[CONF_OPTIMISTIC] or (
|
||||
self._optimistic = config[CONF_OPTIMISTIC] or (
|
||||
(no_position or optimistic_position)
|
||||
and (no_state or optimistic_state)
|
||||
and (no_tilt or optimistic_tilt)
|
||||
):
|
||||
# Force into optimistic mode.
|
||||
self._optimistic = True
|
||||
self._attr_assumed_state = bool(self._optimistic)
|
||||
)
|
||||
self._attr_assumed_state = self._optimistic
|
||||
|
||||
if (
|
||||
self._tilt_optimistic = (
|
||||
config[CONF_TILT_STATE_OPTIMISTIC]
|
||||
or config.get(CONF_TILT_STATUS_TOPIC) is None
|
||||
):
|
||||
# Force into optimistic tilt mode.
|
||||
self._tilt_optimistic = True
|
||||
)
|
||||
|
||||
template_config_attributes = {
|
||||
"position_open": self._config[CONF_POSITION_OPEN],
|
||||
|
@ -354,6 +336,13 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
|
||||
self._attr_supported_features = supported_features
|
||||
|
||||
@callback
|
||||
def _update_state(self, state: str) -> None:
|
||||
"""Update the cover state."""
|
||||
self._attr_is_closed = state == STATE_CLOSED
|
||||
self._attr_is_opening = state == STATE_OPENING
|
||||
self._attr_is_closing = state == STATE_CLOSING
|
||||
|
||||
def _prepare_subscribe_topics(self) -> None:
|
||||
"""(Re)Subscribe to topics."""
|
||||
topics = {}
|
||||
|
@ -380,25 +369,24 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
_LOGGER.debug("Ignoring empty state message from '%s'", msg.topic)
|
||||
return
|
||||
|
||||
state: str
|
||||
if payload == self._config[CONF_STATE_STOPPED]:
|
||||
if self._config.get(CONF_GET_POSITION_TOPIC) is not None:
|
||||
self._state = (
|
||||
state = (
|
||||
STATE_CLOSED
|
||||
if self._position == DEFAULT_POSITION_CLOSED
|
||||
if self._attr_current_cover_position == DEFAULT_POSITION_CLOSED
|
||||
else STATE_OPEN
|
||||
)
|
||||
else:
|
||||
self._state = (
|
||||
STATE_CLOSED if self._state == STATE_CLOSING else STATE_OPEN
|
||||
)
|
||||
state = STATE_CLOSED if self.state == STATE_CLOSING else STATE_OPEN
|
||||
elif payload == self._config[CONF_STATE_OPENING]:
|
||||
self._state = STATE_OPENING
|
||||
state = STATE_OPENING
|
||||
elif payload == self._config[CONF_STATE_CLOSING]:
|
||||
self._state = STATE_CLOSING
|
||||
state = STATE_CLOSING
|
||||
elif payload == self._config[CONF_STATE_OPEN]:
|
||||
self._state = STATE_OPEN
|
||||
state = STATE_OPEN
|
||||
elif payload == self._config[CONF_STATE_CLOSED]:
|
||||
self._state = STATE_CLOSED
|
||||
state = STATE_CLOSED
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
(
|
||||
|
@ -408,6 +396,7 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
payload,
|
||||
)
|
||||
return
|
||||
self._update_state(state)
|
||||
|
||||
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
|
||||
|
||||
|
@ -447,9 +436,9 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
_LOGGER.warning("Payload '%s' is not numeric", payload)
|
||||
return
|
||||
|
||||
self._position = percentage_payload
|
||||
self._attr_current_cover_position = percentage_payload
|
||||
if self._config.get(CONF_STATE_TOPIC) is None:
|
||||
self._state = (
|
||||
self._update_state(
|
||||
STATE_CLOSED
|
||||
if percentage_payload == DEFAULT_POSITION_CLOSED
|
||||
else STATE_OPEN
|
||||
|
@ -489,37 +478,6 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
"""(Re)Subscribe to topics."""
|
||||
await subscription.async_subscribe_topics(self.hass, self._sub_state)
|
||||
|
||||
@property
|
||||
def is_closed(self) -> bool | None:
|
||||
"""Return true if the cover is closed or None if the status is unknown."""
|
||||
if self._state is None:
|
||||
return None
|
||||
|
||||
return self._state == STATE_CLOSED
|
||||
|
||||
@property
|
||||
def is_opening(self) -> bool:
|
||||
"""Return true if the cover is actively opening."""
|
||||
return self._state == STATE_OPENING
|
||||
|
||||
@property
|
||||
def is_closing(self) -> bool:
|
||||
"""Return true if the cover is actively closing."""
|
||||
return self._state == STATE_CLOSING
|
||||
|
||||
@property
|
||||
def current_cover_position(self) -> int | None:
|
||||
"""Return current position of cover.
|
||||
|
||||
None is unknown, 0 is closed, 100 is fully open.
|
||||
"""
|
||||
return self._position
|
||||
|
||||
@property
|
||||
def current_cover_tilt_position(self) -> int | None:
|
||||
"""Return current position of cover tilt."""
|
||||
return self._tilt_value
|
||||
|
||||
async def async_open_cover(self, **kwargs: Any) -> None:
|
||||
"""Move the cover up.
|
||||
|
||||
|
@ -534,9 +492,9 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
)
|
||||
if self._optimistic:
|
||||
# Optimistically assume that cover has changed state.
|
||||
self._state = STATE_OPEN
|
||||
self._update_state(STATE_OPEN)
|
||||
if self._config.get(CONF_GET_POSITION_TOPIC):
|
||||
self._position = self.find_percentage_in_range(
|
||||
self._attr_current_cover_position = self.find_percentage_in_range(
|
||||
self._config[CONF_POSITION_OPEN], COVER_PAYLOAD
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
|
@ -555,9 +513,9 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
)
|
||||
if self._optimistic:
|
||||
# Optimistically assume that cover has changed state.
|
||||
self._state = STATE_CLOSED
|
||||
self._update_state(STATE_CLOSED)
|
||||
if self._config.get(CONF_GET_POSITION_TOPIC):
|
||||
self._position = self.find_percentage_in_range(
|
||||
self._attr_current_cover_position = self.find_percentage_in_range(
|
||||
self._config[CONF_POSITION_CLOSED], COVER_PAYLOAD
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
|
@ -595,7 +553,7 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
self._config[CONF_ENCODING],
|
||||
)
|
||||
if self._tilt_optimistic:
|
||||
self._tilt_value = self.find_percentage_in_range(
|
||||
self._attr_current_cover_tilt_position = self.find_percentage_in_range(
|
||||
float(self._config[CONF_TILT_OPEN_POSITION])
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
|
@ -622,7 +580,7 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
self._config[CONF_ENCODING],
|
||||
)
|
||||
if self._tilt_optimistic:
|
||||
self._tilt_value = self.find_percentage_in_range(
|
||||
self._attr_current_cover_tilt_position = self.find_percentage_in_range(
|
||||
float(self._config[CONF_TILT_CLOSED_POSITION])
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
|
@ -653,7 +611,7 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
)
|
||||
if self._tilt_optimistic:
|
||||
_LOGGER.debug("Set tilt value optimistic")
|
||||
self._tilt_value = percentage_tilt
|
||||
self._attr_current_cover_tilt_position = percentage_tilt
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_set_cover_position(self, **kwargs: Any) -> None:
|
||||
|
@ -679,12 +637,12 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
self._config[CONF_ENCODING],
|
||||
)
|
||||
if self._optimistic:
|
||||
self._state = (
|
||||
self._update_state(
|
||||
STATE_CLOSED
|
||||
if percentage_position == self._config[CONF_POSITION_CLOSED]
|
||||
else STATE_OPEN
|
||||
)
|
||||
self._position = percentage_position
|
||||
self._attr_current_cover_position = percentage_position
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_toggle_tilt(self, **kwargs: Any) -> None:
|
||||
|
@ -696,7 +654,7 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
|
||||
def is_tilt_closed(self) -> bool:
|
||||
"""Return if the cover is tilted closed."""
|
||||
return self._tilt_value == self.find_percentage_in_range(
|
||||
return self._attr_current_cover_tilt_position == self.find_percentage_in_range(
|
||||
float(self._config[CONF_TILT_CLOSED_POSITION])
|
||||
)
|
||||
|
||||
|
@ -762,7 +720,7 @@ class MqttCover(MqttEntity, CoverEntity):
|
|||
<= self._config[CONF_TILT_MIN]
|
||||
):
|
||||
level = self.find_percentage_in_range(payload)
|
||||
self._tilt_value = level
|
||||
self._attr_current_cover_tilt_position = level
|
||||
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue