Improve MQTT type hints / refactor part 10 - vacuum (#81253)
* Improve type hints vacuum __init__ and schema * Improve type hints and refactor templates legacy * Improve type hints state vacuum * Add hint for template parameters * Apply suggestions from code review Co-authored-by: epenet <6771947+epenet@users.noreply.github.com> * Some corrections * Remove stale constant * Use bitwise and * Follow-up comment * Remove incorrect type hint * Remove asserts * Cleanup asserts and reduce code Co-authored-by: epenet <6771947+epenet@users.noreply.github.com>
This commit is contained in:
parent
6bab63fb0a
commit
6975186f2f
5 changed files with 275 additions and 251 deletions
|
@ -1107,7 +1107,7 @@ class MqttEntity(
|
|||
payload: PublishPayloadType,
|
||||
qos: int = 0,
|
||||
retain: bool = False,
|
||||
encoding: str = DEFAULT_ENCODING,
|
||||
encoding: str | None = DEFAULT_ENCODING,
|
||||
) -> None:
|
||||
"""Publish message to an MQTT topic."""
|
||||
log_message(self.hass, self.entity_id, topic, payload, qos, retain)
|
||||
|
|
|
@ -11,8 +11,9 @@ from homeassistant.core import HomeAssistant
|
|||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
|
||||
from ..const import CONF_SCHEMA
|
||||
from ..mixins import async_setup_entry_helper, async_setup_platform_helper
|
||||
from .schema import CONF_SCHEMA, LEGACY, MQTT_VACUUM_SCHEMA, STATE
|
||||
from .schema import LEGACY, MQTT_VACUUM_SCHEMA, STATE
|
||||
from .schema_legacy import (
|
||||
DISCOVERY_SCHEMA_LEGACY,
|
||||
PLATFORM_SCHEMA_LEGACY,
|
||||
|
@ -27,26 +28,29 @@ from .schema_state import (
|
|||
)
|
||||
|
||||
|
||||
def validate_mqtt_vacuum_discovery(value):
|
||||
def validate_mqtt_vacuum_discovery(config_value: ConfigType) -> ConfigType:
|
||||
"""Validate MQTT vacuum schema."""
|
||||
schemas = {LEGACY: DISCOVERY_SCHEMA_LEGACY, STATE: DISCOVERY_SCHEMA_STATE}
|
||||
return schemas[value[CONF_SCHEMA]](value)
|
||||
config: ConfigType = schemas[config_value[CONF_SCHEMA]](config_value)
|
||||
return config
|
||||
|
||||
|
||||
# Configuring MQTT Vacuums under the vacuum platform key is deprecated in HA Core 2022.6
|
||||
def validate_mqtt_vacuum(value):
|
||||
def validate_mqtt_vacuum(config_value: ConfigType) -> ConfigType:
|
||||
"""Validate MQTT vacuum schema (deprecated)."""
|
||||
schemas = {LEGACY: PLATFORM_SCHEMA_LEGACY, STATE: PLATFORM_SCHEMA_STATE}
|
||||
return schemas[value[CONF_SCHEMA]](value)
|
||||
config: ConfigType = schemas[config_value[CONF_SCHEMA]](config_value)
|
||||
return config
|
||||
|
||||
|
||||
def validate_mqtt_vacuum_modern(value):
|
||||
def validate_mqtt_vacuum_modern(config_value: ConfigType) -> ConfigType:
|
||||
"""Validate MQTT vacuum modern schema."""
|
||||
schemas = {
|
||||
LEGACY: PLATFORM_SCHEMA_LEGACY_MODERN,
|
||||
STATE: PLATFORM_SCHEMA_STATE_MODERN,
|
||||
}
|
||||
return schemas[value[CONF_SCHEMA]](value)
|
||||
config: ConfigType = schemas[config_value[CONF_SCHEMA]](config_value)
|
||||
return config
|
||||
|
||||
|
||||
DISCOVERY_SCHEMA = vol.All(
|
||||
|
@ -96,8 +100,8 @@ async def _async_setup_entity(
|
|||
hass: HomeAssistant,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
config: ConfigType,
|
||||
config_entry: ConfigEntry | None = None,
|
||||
discovery_data: dict | None = None,
|
||||
config_entry: ConfigEntry,
|
||||
discovery_data: DiscoveryInfoType | None = None,
|
||||
) -> None:
|
||||
"""Set up the MQTT vacuum."""
|
||||
setup_entity = {
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
"""Shared schema code."""
|
||||
from __future__ import annotations
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.vacuum import VacuumEntityFeature
|
||||
|
||||
from ..const import CONF_SCHEMA
|
||||
|
||||
LEGACY = "legacy"
|
||||
|
@ -15,18 +19,23 @@ MQTT_VACUUM_SCHEMA = vol.Schema(
|
|||
)
|
||||
|
||||
|
||||
def services_to_strings(services, service_to_string):
|
||||
def services_to_strings(
|
||||
services: VacuumEntityFeature | int,
|
||||
service_to_string: dict[VacuumEntityFeature, str],
|
||||
) -> list[str]:
|
||||
"""Convert SUPPORT_* service bitmask to list of service strings."""
|
||||
strings = []
|
||||
for service in service_to_string:
|
||||
if service & services:
|
||||
strings.append(service_to_string[service])
|
||||
return strings
|
||||
return [
|
||||
service_to_string[service]
|
||||
for service in service_to_string
|
||||
if service & services
|
||||
]
|
||||
|
||||
|
||||
def strings_to_services(strings, string_to_service):
|
||||
def strings_to_services(
|
||||
strings: list[str], string_to_service: dict[str, VacuumEntityFeature]
|
||||
) -> VacuumEntityFeature | int:
|
||||
"""Convert service strings to SUPPORT_* service bitmask."""
|
||||
services = 0
|
||||
services: VacuumEntityFeature | int = 0
|
||||
for string in strings:
|
||||
services |= string_to_service[string]
|
||||
return services
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
"""Support for Legacy MQTT vacuum."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.vacuum import (
|
||||
|
@ -8,18 +13,26 @@ from homeassistant.components.vacuum import (
|
|||
VacuumEntity,
|
||||
VacuumEntityFeature,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import ATTR_SUPPORTED_FEATURES, CONF_NAME
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.icon import icon_for_battery_level
|
||||
from homeassistant.helpers.json import json_dumps
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
|
||||
from .. import subscription
|
||||
from ..config import MQTT_BASE_SCHEMA
|
||||
from ..const import CONF_COMMAND_TOPIC, CONF_ENCODING, CONF_QOS, CONF_RETAIN
|
||||
from ..debug_info import log_messages
|
||||
from ..mixins import MQTT_ENTITY_COMMON_SCHEMA, MqttEntity, warn_for_legacy_schema
|
||||
from ..models import MqttValueTemplate, PayloadSentinel, ReceiveMessage
|
||||
from ..models import (
|
||||
MqttValueTemplate,
|
||||
PayloadSentinel,
|
||||
ReceiveMessage,
|
||||
ReceivePayloadType,
|
||||
)
|
||||
from ..util import get_mqtt_data, valid_publish_topic
|
||||
from .const import MQTT_VACUUM_ATTRIBUTES_BLOCKED
|
||||
from .schema import MQTT_VACUUM_SCHEMA, services_to_strings, strings_to_services
|
||||
|
@ -158,9 +171,45 @@ DISCOVERY_SCHEMA_LEGACY = PLATFORM_SCHEMA_LEGACY_MODERN.extend(
|
|||
)
|
||||
|
||||
|
||||
_COMMANDS = {
|
||||
VacuumEntityFeature.TURN_ON: {
|
||||
"payload": CONF_PAYLOAD_TURN_ON,
|
||||
"status": "Cleaning",
|
||||
},
|
||||
VacuumEntityFeature.TURN_OFF: {
|
||||
"payload": CONF_PAYLOAD_TURN_OFF,
|
||||
"status": "Turning Off",
|
||||
},
|
||||
VacuumEntityFeature.STOP: {
|
||||
"payload": CONF_PAYLOAD_STOP,
|
||||
"status": "Stopping the current task",
|
||||
},
|
||||
VacuumEntityFeature.CLEAN_SPOT: {
|
||||
"payload": CONF_PAYLOAD_CLEAN_SPOT,
|
||||
"status": "Cleaning spot",
|
||||
},
|
||||
VacuumEntityFeature.LOCATE: {
|
||||
"payload": CONF_PAYLOAD_LOCATE,
|
||||
"status": "Hi, I'm over here!",
|
||||
},
|
||||
VacuumEntityFeature.PAUSE: {
|
||||
"payload": CONF_PAYLOAD_START_PAUSE,
|
||||
"status": "Pausing/Resuming cleaning...",
|
||||
},
|
||||
VacuumEntityFeature.RETURN_HOME: {
|
||||
"payload": CONF_PAYLOAD_RETURN_TO_BASE,
|
||||
"status": "Returning home...",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
async def async_setup_entity_legacy(
|
||||
hass, config, async_add_entities, config_entry, discovery_data
|
||||
):
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
config_entry: ConfigEntry,
|
||||
discovery_data: DiscoveryInfoType | None,
|
||||
) -> None:
|
||||
"""Set up a MQTT Vacuum Legacy."""
|
||||
async_add_entities([MqttVacuum(hass, config, config_entry, discovery_data)])
|
||||
|
||||
|
@ -171,24 +220,42 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
_entity_id_format = ENTITY_ID_FORMAT
|
||||
_attributes_extra_blocked = MQTT_LEGACY_VACUUM_ATTRIBUTES_BLOCKED
|
||||
|
||||
def __init__(self, hass, config, config_entry, discovery_data):
|
||||
_encoding: str | None
|
||||
_qos: bool
|
||||
_retain: bool
|
||||
_payloads: dict[str, str]
|
||||
_send_command_topic: str | None
|
||||
_set_fan_speed_topic: str | None
|
||||
_state_topics: dict[str, str | None]
|
||||
_templates: dict[
|
||||
str, Callable[[ReceivePayloadType, PayloadSentinel], ReceivePayloadType]
|
||||
]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
config_entry: ConfigEntry,
|
||||
discovery_data: DiscoveryInfoType | None,
|
||||
) -> None:
|
||||
"""Initialize the vacuum."""
|
||||
self._attr_battery_level = 0
|
||||
self._attr_is_on = False
|
||||
self._attr_fan_speed = "unknown"
|
||||
|
||||
self._charging = False
|
||||
self._cleaning = False
|
||||
self._docked = False
|
||||
self._error = None
|
||||
self._error: str | None = None
|
||||
|
||||
MqttEntity.__init__(self, hass, config, config_entry, discovery_data)
|
||||
|
||||
@staticmethod
|
||||
def config_schema():
|
||||
def config_schema() -> vol.Schema:
|
||||
"""Return the config schema."""
|
||||
return DISCOVERY_SCHEMA_LEGACY
|
||||
|
||||
def _setup_from_config(self, config):
|
||||
def _setup_from_config(self, config: ConfigType) -> None:
|
||||
"""(Re)Setup the entity."""
|
||||
supported_feature_strings = config[CONF_SUPPORTED_FEATURES]
|
||||
self._attr_supported_features = strings_to_services(
|
||||
|
@ -204,7 +271,7 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
self._send_command_topic = config.get(CONF_SEND_COMMAND_TOPIC)
|
||||
|
||||
self._payloads = {
|
||||
key: config.get(key)
|
||||
key: config[key]
|
||||
for key in (
|
||||
CONF_PAYLOAD_TURN_ON,
|
||||
CONF_PAYLOAD_TURN_OFF,
|
||||
|
@ -227,7 +294,9 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
)
|
||||
}
|
||||
self._templates = {
|
||||
key: config.get(key)
|
||||
key: MqttValueTemplate(
|
||||
config[key], entity=self
|
||||
).async_render_with_possible_json_value
|
||||
for key in (
|
||||
CONF_BATTERY_LEVEL_TEMPLATE,
|
||||
CONF_CHARGING_TEMPLATE,
|
||||
|
@ -236,13 +305,11 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
CONF_ERROR_TEMPLATE,
|
||||
CONF_FAN_SPEED_TEMPLATE,
|
||||
)
|
||||
if key in config
|
||||
}
|
||||
|
||||
def _prepare_subscribe_topics(self):
|
||||
def _prepare_subscribe_topics(self) -> None:
|
||||
"""(Re)Subscribe to topics."""
|
||||
for tpl in self._templates.values():
|
||||
if tpl is not None:
|
||||
tpl = MqttValueTemplate(tpl, entity=self)
|
||||
|
||||
@callback
|
||||
@log_messages(self.hass, self.entity_id)
|
||||
|
@ -250,11 +317,9 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
"""Handle new MQTT message."""
|
||||
if (
|
||||
msg.topic == self._state_topics[CONF_BATTERY_LEVEL_TOPIC]
|
||||
and self._templates[CONF_BATTERY_LEVEL_TEMPLATE]
|
||||
and CONF_BATTERY_LEVEL_TEMPLATE in self._config
|
||||
):
|
||||
battery_level = self._templates[
|
||||
CONF_BATTERY_LEVEL_TEMPLATE
|
||||
].async_render_with_possible_json_value(
|
||||
battery_level = self._templates[CONF_BATTERY_LEVEL_TEMPLATE](
|
||||
msg.payload, PayloadSentinel.DEFAULT
|
||||
)
|
||||
if battery_level and battery_level is not PayloadSentinel.DEFAULT:
|
||||
|
@ -262,11 +327,9 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
|
||||
if (
|
||||
msg.topic == self._state_topics[CONF_CHARGING_TOPIC]
|
||||
and self._templates[CONF_CHARGING_TEMPLATE]
|
||||
and CONF_CHARGING_TEMPLATE in self._templates
|
||||
):
|
||||
charging = self._templates[
|
||||
CONF_CHARGING_TEMPLATE
|
||||
].async_render_with_possible_json_value(
|
||||
charging = self._templates[CONF_CHARGING_TEMPLATE](
|
||||
msg.payload, PayloadSentinel.DEFAULT
|
||||
)
|
||||
if charging and charging is not PayloadSentinel.DEFAULT:
|
||||
|
@ -274,11 +337,9 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
|
||||
if (
|
||||
msg.topic == self._state_topics[CONF_CLEANING_TOPIC]
|
||||
and self._templates[CONF_CLEANING_TEMPLATE]
|
||||
and CONF_CLEANING_TEMPLATE in self._config
|
||||
):
|
||||
cleaning = self._templates[
|
||||
CONF_CLEANING_TEMPLATE
|
||||
].async_render_with_possible_json_value(
|
||||
cleaning = self._templates[CONF_CLEANING_TEMPLATE](
|
||||
msg.payload, PayloadSentinel.DEFAULT
|
||||
)
|
||||
if cleaning and cleaning is not PayloadSentinel.DEFAULT:
|
||||
|
@ -286,11 +347,9 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
|
||||
if (
|
||||
msg.topic == self._state_topics[CONF_DOCKED_TOPIC]
|
||||
and self._templates[CONF_DOCKED_TEMPLATE]
|
||||
and CONF_DOCKED_TEMPLATE in self._config
|
||||
):
|
||||
docked = self._templates[
|
||||
CONF_DOCKED_TEMPLATE
|
||||
].async_render_with_possible_json_value(
|
||||
docked = self._templates[CONF_DOCKED_TEMPLATE](
|
||||
msg.payload, PayloadSentinel.DEFAULT
|
||||
)
|
||||
if docked and docked is not PayloadSentinel.DEFAULT:
|
||||
|
@ -298,11 +357,9 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
|
||||
if (
|
||||
msg.topic == self._state_topics[CONF_ERROR_TOPIC]
|
||||
and self._templates[CONF_ERROR_TEMPLATE]
|
||||
and CONF_ERROR_TEMPLATE in self._config
|
||||
):
|
||||
error = self._templates[
|
||||
CONF_ERROR_TEMPLATE
|
||||
].async_render_with_possible_json_value(
|
||||
error = self._templates[CONF_ERROR_TEMPLATE](
|
||||
msg.payload, PayloadSentinel.DEFAULT
|
||||
)
|
||||
if error is not PayloadSentinel.DEFAULT:
|
||||
|
@ -322,15 +379,13 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
|
||||
if (
|
||||
msg.topic == self._state_topics[CONF_FAN_SPEED_TOPIC]
|
||||
and self._templates[CONF_FAN_SPEED_TEMPLATE]
|
||||
and CONF_FAN_SPEED_TEMPLATE in self._config
|
||||
):
|
||||
fan_speed = self._templates[
|
||||
CONF_FAN_SPEED_TEMPLATE
|
||||
].async_render_with_possible_json_value(
|
||||
fan_speed = self._templates[CONF_FAN_SPEED_TEMPLATE](
|
||||
msg.payload, PayloadSentinel.DEFAULT
|
||||
)
|
||||
if fan_speed and fan_speed is not PayloadSentinel.DEFAULT:
|
||||
self._attr_fan_speed = fan_speed
|
||||
self._attr_fan_speed = str(fan_speed)
|
||||
|
||||
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
|
||||
|
||||
|
@ -349,12 +404,12 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
},
|
||||
)
|
||||
|
||||
async def _subscribe_topics(self):
|
||||
async def _subscribe_topics(self) -> None:
|
||||
"""(Re)Subscribe to topics."""
|
||||
await subscription.async_subscribe_topics(self.hass, self._sub_state)
|
||||
|
||||
@property
|
||||
def battery_icon(self):
|
||||
def battery_icon(self) -> str:
|
||||
"""Return the battery icon for the vacuum cleaner.
|
||||
|
||||
No need to check VacuumEntityFeature.BATTERY, this won't be called if battery_level is None.
|
||||
|
@ -363,116 +418,57 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
battery_level=self.battery_level, charging=self._charging
|
||||
)
|
||||
|
||||
async def async_turn_on(self, **kwargs):
|
||||
"""Turn the vacuum on."""
|
||||
if self.supported_features & VacuumEntityFeature.TURN_ON == 0:
|
||||
async def _async_publish_command(self, feature: VacuumEntityFeature) -> None:
|
||||
"""Check for a missing feature or command topic."""
|
||||
|
||||
if self._command_topic is None or self.supported_features & feature == 0:
|
||||
return
|
||||
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._payloads[CONF_PAYLOAD_TURN_ON],
|
||||
self._qos,
|
||||
self._retain,
|
||||
self._encoding,
|
||||
self._payloads[_COMMANDS[feature]["payload"]],
|
||||
qos=self._qos,
|
||||
retain=self._retain,
|
||||
encoding=self._encoding,
|
||||
)
|
||||
self._attr_status = "Cleaning"
|
||||
self._attr_status = _COMMANDS[feature]["status"]
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_turn_off(self, **kwargs):
|
||||
async def async_turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn the vacuum on."""
|
||||
await self._async_publish_command(VacuumEntityFeature.TURN_ON)
|
||||
|
||||
async def async_turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn the vacuum off."""
|
||||
if self.supported_features & VacuumEntityFeature.TURN_OFF == 0:
|
||||
return None
|
||||
await self._async_publish_command(VacuumEntityFeature.TURN_OFF)
|
||||
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._payloads[CONF_PAYLOAD_TURN_OFF],
|
||||
self._qos,
|
||||
self._retain,
|
||||
self._encoding,
|
||||
)
|
||||
self._attr_status = "Turning Off"
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_stop(self, **kwargs):
|
||||
async def async_stop(self, **kwargs: Any) -> None:
|
||||
"""Stop the vacuum."""
|
||||
if self.supported_features & VacuumEntityFeature.STOP == 0:
|
||||
return None
|
||||
await self._async_publish_command(VacuumEntityFeature.STOP)
|
||||
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._payloads[CONF_PAYLOAD_STOP],
|
||||
self._qos,
|
||||
self._retain,
|
||||
self._encoding,
|
||||
)
|
||||
self._attr_status = "Stopping the current task"
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_clean_spot(self, **kwargs):
|
||||
async def async_clean_spot(self, **kwargs: Any) -> None:
|
||||
"""Perform a spot clean-up."""
|
||||
if self.supported_features & VacuumEntityFeature.CLEAN_SPOT == 0:
|
||||
return None
|
||||
await self._async_publish_command(VacuumEntityFeature.CLEAN_SPOT)
|
||||
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._payloads[CONF_PAYLOAD_CLEAN_SPOT],
|
||||
self._qos,
|
||||
self._retain,
|
||||
self._encoding,
|
||||
)
|
||||
self._attr_status = "Cleaning spot"
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_locate(self, **kwargs):
|
||||
async def async_locate(self, **kwargs: Any) -> None:
|
||||
"""Locate the vacuum (usually by playing a song)."""
|
||||
if self.supported_features & VacuumEntityFeature.LOCATE == 0:
|
||||
return None
|
||||
await self._async_publish_command(VacuumEntityFeature.LOCATE)
|
||||
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._payloads[CONF_PAYLOAD_LOCATE],
|
||||
self._qos,
|
||||
self._retain,
|
||||
self._encoding,
|
||||
)
|
||||
self._attr_status = "Hi, I'm over here!"
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_start_pause(self, **kwargs):
|
||||
async def async_start_pause(self, **kwargs: Any) -> None:
|
||||
"""Start, pause or resume the cleaning task."""
|
||||
if self.supported_features & VacuumEntityFeature.PAUSE == 0:
|
||||
return None
|
||||
await self._async_publish_command(VacuumEntityFeature.PAUSE)
|
||||
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._payloads[CONF_PAYLOAD_START_PAUSE],
|
||||
self._qos,
|
||||
self._retain,
|
||||
self._encoding,
|
||||
)
|
||||
self._attr_status = "Pausing/Resuming cleaning..."
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_return_to_base(self, **kwargs):
|
||||
async def async_return_to_base(self, **kwargs: Any) -> None:
|
||||
"""Tell the vacuum to return to its dock."""
|
||||
if self.supported_features & VacuumEntityFeature.RETURN_HOME == 0:
|
||||
return None
|
||||
await self._async_publish_command(VacuumEntityFeature.RETURN_HOME)
|
||||
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._payloads[CONF_PAYLOAD_RETURN_TO_BASE],
|
||||
self._qos,
|
||||
self._retain,
|
||||
self._encoding,
|
||||
)
|
||||
self._attr_status = "Returning home..."
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_set_fan_speed(self, fan_speed, **kwargs):
|
||||
async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None:
|
||||
"""Set fan speed."""
|
||||
if (
|
||||
self.supported_features & VacuumEntityFeature.FAN_SPEED == 0
|
||||
) or fan_speed not in self.fan_speed_list:
|
||||
self._set_fan_speed_topic is None
|
||||
or (self.supported_features & VacuumEntityFeature.FAN_SPEED == 0)
|
||||
or fan_speed not in self.fan_speed_list
|
||||
):
|
||||
return None
|
||||
|
||||
await self.async_publish(
|
||||
|
@ -485,22 +481,30 @@ class MqttVacuum(MqttEntity, VacuumEntity):
|
|||
self._attr_status = f"Setting fan to {fan_speed}..."
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_send_command(self, command, params=None, **kwargs):
|
||||
async def async_send_command(
|
||||
self,
|
||||
command: str,
|
||||
params: dict[str, Any] | list[Any] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Send a command to a vacuum cleaner."""
|
||||
if self.supported_features & VacuumEntityFeature.SEND_COMMAND == 0:
|
||||
if (
|
||||
self._send_command_topic is None
|
||||
or self.supported_features & VacuumEntityFeature.SEND_COMMAND == 0
|
||||
):
|
||||
return
|
||||
if params:
|
||||
message = {"command": command}
|
||||
message: dict[str, Any] = {"command": command}
|
||||
message.update(params)
|
||||
message = json_dumps(message)
|
||||
message_payload = json_dumps(message)
|
||||
else:
|
||||
message = command
|
||||
message_payload = command
|
||||
await self.async_publish(
|
||||
self._send_command_topic,
|
||||
message,
|
||||
message_payload,
|
||||
self._qos,
|
||||
self._retain,
|
||||
self._encoding,
|
||||
)
|
||||
self._attr_status = f"Sending command {message}..."
|
||||
self._attr_status = f"Sending command {message_payload}..."
|
||||
self.async_write_ha_state()
|
||||
|
|
|
@ -1,4 +1,8 @@
|
|||
"""Support for a State MQTT vacuum."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.vacuum import (
|
||||
|
@ -11,15 +15,18 @@ from homeassistant.components.vacuum import (
|
|||
StateVacuumEntity,
|
||||
VacuumEntityFeature,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import (
|
||||
ATTR_SUPPORTED_FEATURES,
|
||||
CONF_NAME,
|
||||
STATE_IDLE,
|
||||
STATE_PAUSED,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.json import json_dumps, json_loads
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
|
||||
from .. import subscription
|
||||
from ..config import MQTT_BASE_SCHEMA
|
||||
|
@ -32,11 +39,12 @@ from ..const import (
|
|||
)
|
||||
from ..debug_info import log_messages
|
||||
from ..mixins import MQTT_ENTITY_COMMON_SCHEMA, MqttEntity, warn_for_legacy_schema
|
||||
from ..models import ReceiveMessage
|
||||
from ..util import get_mqtt_data, valid_publish_topic
|
||||
from .const import MQTT_VACUUM_ATTRIBUTES_BLOCKED
|
||||
from .schema import MQTT_VACUUM_SCHEMA, services_to_strings, strings_to_services
|
||||
|
||||
SERVICE_TO_STRING = {
|
||||
SERVICE_TO_STRING: dict[VacuumEntityFeature, str] = {
|
||||
VacuumEntityFeature.START: "start",
|
||||
VacuumEntityFeature.PAUSE: "pause",
|
||||
VacuumEntityFeature.STOP: "stop",
|
||||
|
@ -52,7 +60,7 @@ SERVICE_TO_STRING = {
|
|||
STRING_TO_SERVICE = {v: k for k, v in SERVICE_TO_STRING.items()}
|
||||
|
||||
|
||||
DEFAULT_SERVICES = (
|
||||
DEFAULT_SERVICES: VacuumEntityFeature | int = (
|
||||
VacuumEntityFeature.START
|
||||
| VacuumEntityFeature.STOP
|
||||
| VacuumEntityFeature.RETURN_HOME
|
||||
|
@ -60,7 +68,7 @@ DEFAULT_SERVICES = (
|
|||
| VacuumEntityFeature.BATTERY
|
||||
| VacuumEntityFeature.CLEAN_SPOT
|
||||
)
|
||||
ALL_SERVICES = (
|
||||
ALL_SERVICES: VacuumEntityFeature | int = (
|
||||
DEFAULT_SERVICES
|
||||
| VacuumEntityFeature.PAUSE
|
||||
| VacuumEntityFeature.LOCATE
|
||||
|
@ -72,7 +80,7 @@ BATTERY = "battery_level"
|
|||
FAN_SPEED = "fan_speed"
|
||||
STATE = "state"
|
||||
|
||||
POSSIBLE_STATES = {
|
||||
POSSIBLE_STATES: dict[str, str] = {
|
||||
STATE_IDLE: STATE_IDLE,
|
||||
STATE_DOCKED: STATE_DOCKED,
|
||||
STATE_ERROR: STATE_ERROR,
|
||||
|
@ -104,6 +112,15 @@ DEFAULT_PAYLOAD_LOCATE = "locate"
|
|||
DEFAULT_PAYLOAD_START = "start"
|
||||
DEFAULT_PAYLOAD_PAUSE = "pause"
|
||||
|
||||
_FEATURE_PAYLOADS = {
|
||||
VacuumEntityFeature.START: CONF_PAYLOAD_START,
|
||||
VacuumEntityFeature.STOP: CONF_PAYLOAD_STOP,
|
||||
VacuumEntityFeature.PAUSE: CONF_PAYLOAD_PAUSE,
|
||||
VacuumEntityFeature.CLEAN_SPOT: CONF_PAYLOAD_CLEAN_SPOT,
|
||||
VacuumEntityFeature.LOCATE: CONF_PAYLOAD_LOCATE,
|
||||
VacuumEntityFeature.RETURN_HOME: CONF_PAYLOAD_RETURN_TO_BASE,
|
||||
}
|
||||
|
||||
PLATFORM_SCHEMA_STATE_MODERN = (
|
||||
MQTT_BASE_SCHEMA.extend(
|
||||
{
|
||||
|
@ -147,8 +164,12 @@ DISCOVERY_SCHEMA_STATE = PLATFORM_SCHEMA_STATE_MODERN.extend({}, extra=vol.REMOV
|
|||
|
||||
|
||||
async def async_setup_entity_state(
|
||||
hass, config, async_add_entities, config_entry, discovery_data
|
||||
):
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
config_entry: ConfigEntry,
|
||||
discovery_data: DiscoveryInfoType | None,
|
||||
) -> None:
|
||||
"""Set up a State MQTT Vacuum."""
|
||||
async_add_entities([MqttStateVacuum(hass, config, config_entry, discovery_data)])
|
||||
|
||||
|
@ -159,20 +180,30 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity):
|
|||
_entity_id_format = ENTITY_ID_FORMAT
|
||||
_attributes_extra_blocked = MQTT_VACUUM_ATTRIBUTES_BLOCKED
|
||||
|
||||
def __init__(self, hass, config, config_entry, discovery_data):
|
||||
_set_fan_speed_topic: str | None
|
||||
_send_command_topic: str | None
|
||||
_payloads: dict[str, str | None]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
config_entry: ConfigEntry,
|
||||
discovery_data: DiscoveryInfoType | None,
|
||||
) -> None:
|
||||
"""Initialize the vacuum."""
|
||||
self._state_attrs = {}
|
||||
self._state_attrs: dict[str, Any] = {}
|
||||
|
||||
MqttEntity.__init__(self, hass, config, config_entry, discovery_data)
|
||||
|
||||
@staticmethod
|
||||
def config_schema():
|
||||
def config_schema() -> vol.Schema:
|
||||
"""Return the config schema."""
|
||||
return DISCOVERY_SCHEMA_STATE
|
||||
|
||||
def _setup_from_config(self, config):
|
||||
def _setup_from_config(self, config: ConfigType) -> None:
|
||||
"""(Re)Setup the entity."""
|
||||
supported_feature_strings = config[CONF_SUPPORTED_FEATURES]
|
||||
supported_feature_strings: list[str] = config[CONF_SUPPORTED_FEATURES]
|
||||
self._attr_supported_features = strings_to_services(
|
||||
supported_feature_strings, STRING_TO_SERVICE
|
||||
)
|
||||
|
@ -193,21 +224,21 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity):
|
|||
)
|
||||
}
|
||||
|
||||
def _update_state_attributes(self, payload):
|
||||
def _update_state_attributes(self, payload: dict[str, Any]) -> None:
|
||||
"""Update the entity state attributes."""
|
||||
self._state_attrs.update(payload)
|
||||
self._attr_fan_speed = self._state_attrs.get(FAN_SPEED, 0)
|
||||
self._attr_battery_level = max(0, min(100, self._state_attrs.get(BATTERY, 0)))
|
||||
|
||||
def _prepare_subscribe_topics(self):
|
||||
def _prepare_subscribe_topics(self) -> None:
|
||||
"""(Re)Subscribe to topics."""
|
||||
topics = {}
|
||||
topics: dict[str, Any] = {}
|
||||
|
||||
@callback
|
||||
@log_messages(self.hass, self.entity_id)
|
||||
def state_message_received(msg):
|
||||
def state_message_received(msg: ReceiveMessage) -> None:
|
||||
"""Handle state MQTT message."""
|
||||
payload = json_loads(msg.payload)
|
||||
payload: dict[str, Any] = json_loads(msg.payload)
|
||||
if STATE in payload and (
|
||||
payload[STATE] in POSSIBLE_STATES or payload[STATE] is None
|
||||
):
|
||||
|
@ -218,9 +249,9 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity):
|
|||
self._update_state_attributes(payload)
|
||||
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
|
||||
|
||||
if self._config.get(CONF_STATE_TOPIC):
|
||||
if state_topic := self._config.get(CONF_STATE_TOPIC):
|
||||
topics["state_position_topic"] = {
|
||||
"topic": self._config.get(CONF_STATE_TOPIC),
|
||||
"topic": state_topic,
|
||||
"msg_callback": state_message_received,
|
||||
"qos": self._config[CONF_QOS],
|
||||
"encoding": self._config[CONF_ENCODING] or None,
|
||||
|
@ -229,50 +260,54 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity):
|
|||
self.hass, self._sub_state, topics
|
||||
)
|
||||
|
||||
async def _subscribe_topics(self):
|
||||
async def _subscribe_topics(self) -> None:
|
||||
"""(Re)Subscribe to topics."""
|
||||
await subscription.async_subscribe_topics(self.hass, self._sub_state)
|
||||
|
||||
async def async_start(self):
|
||||
async def _async_publish_command(self, feature: VacuumEntityFeature) -> None:
|
||||
"""Check for a missing feature or command topic."""
|
||||
if self._command_topic is None or self.supported_features & feature == 0:
|
||||
return
|
||||
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._payloads[_FEATURE_PAYLOADS[feature]],
|
||||
qos=self._config[CONF_QOS],
|
||||
retain=self._config[CONF_RETAIN],
|
||||
encoding=self._config[CONF_ENCODING],
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_start(self) -> None:
|
||||
"""Start the vacuum."""
|
||||
if self.supported_features & VacuumEntityFeature.START == 0:
|
||||
return None
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._config[CONF_PAYLOAD_START],
|
||||
self._config[CONF_QOS],
|
||||
self._config[CONF_RETAIN],
|
||||
self._config[CONF_ENCODING],
|
||||
)
|
||||
await self._async_publish_command(VacuumEntityFeature.START)
|
||||
|
||||
async def async_pause(self):
|
||||
async def async_pause(self) -> None:
|
||||
"""Pause the vacuum."""
|
||||
if self.supported_features & VacuumEntityFeature.PAUSE == 0:
|
||||
return
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._config[CONF_PAYLOAD_PAUSE],
|
||||
self._config[CONF_QOS],
|
||||
self._config[CONF_RETAIN],
|
||||
self._config[CONF_ENCODING],
|
||||
)
|
||||
await self._async_publish_command(VacuumEntityFeature.PAUSE)
|
||||
|
||||
async def async_stop(self, **kwargs):
|
||||
async def async_stop(self, **kwargs: Any) -> None:
|
||||
"""Stop the vacuum."""
|
||||
if self.supported_features & VacuumEntityFeature.STOP == 0:
|
||||
return
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._config[CONF_PAYLOAD_STOP],
|
||||
self._config[CONF_QOS],
|
||||
self._config[CONF_RETAIN],
|
||||
self._config[CONF_ENCODING],
|
||||
)
|
||||
await self._async_publish_command(VacuumEntityFeature.STOP)
|
||||
|
||||
async def async_set_fan_speed(self, fan_speed, **kwargs):
|
||||
async def async_return_to_base(self, **kwargs: Any) -> None:
|
||||
"""Tell the vacuum to return to its dock."""
|
||||
await self._async_publish_command(VacuumEntityFeature.RETURN_HOME)
|
||||
|
||||
async def async_clean_spot(self, **kwargs: Any) -> None:
|
||||
"""Perform a spot clean-up."""
|
||||
await self._async_publish_command(VacuumEntityFeature.CLEAN_SPOT)
|
||||
|
||||
async def async_locate(self, **kwargs: Any) -> None:
|
||||
"""Locate the vacuum (usually by playing a song)."""
|
||||
await self._async_publish_command(VacuumEntityFeature.LOCATE)
|
||||
|
||||
async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None:
|
||||
"""Set fan speed."""
|
||||
if (self.supported_features & VacuumEntityFeature.FAN_SPEED == 0) or (
|
||||
fan_speed not in self.fan_speed_list
|
||||
if (
|
||||
self._set_fan_speed_topic is None
|
||||
or (self.supported_features & VacuumEntityFeature.FAN_SPEED == 0)
|
||||
or (fan_speed not in self.fan_speed_list)
|
||||
):
|
||||
return
|
||||
await self.async_publish(
|
||||
|
@ -283,55 +318,27 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity):
|
|||
self._config[CONF_ENCODING],
|
||||
)
|
||||
|
||||
async def async_return_to_base(self, **kwargs):
|
||||
"""Tell the vacuum to return to its dock."""
|
||||
if self.supported_features & VacuumEntityFeature.RETURN_HOME == 0:
|
||||
return
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._config[CONF_PAYLOAD_RETURN_TO_BASE],
|
||||
self._config[CONF_QOS],
|
||||
self._config[CONF_RETAIN],
|
||||
self._config[CONF_ENCODING],
|
||||
)
|
||||
|
||||
async def async_clean_spot(self, **kwargs):
|
||||
"""Perform a spot clean-up."""
|
||||
if self.supported_features & VacuumEntityFeature.CLEAN_SPOT == 0:
|
||||
return
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._config[CONF_PAYLOAD_CLEAN_SPOT],
|
||||
self._config[CONF_QOS],
|
||||
self._config[CONF_RETAIN],
|
||||
self._config[CONF_ENCODING],
|
||||
)
|
||||
|
||||
async def async_locate(self, **kwargs):
|
||||
"""Locate the vacuum (usually by playing a song)."""
|
||||
if self.supported_features & VacuumEntityFeature.LOCATE == 0:
|
||||
return
|
||||
await self.async_publish(
|
||||
self._command_topic,
|
||||
self._config[CONF_PAYLOAD_LOCATE],
|
||||
self._config[CONF_QOS],
|
||||
self._config[CONF_RETAIN],
|
||||
self._config[CONF_ENCODING],
|
||||
)
|
||||
|
||||
async def async_send_command(self, command, params=None, **kwargs):
|
||||
async def async_send_command(
|
||||
self,
|
||||
command: str,
|
||||
params: dict[str, Any] | list[Any] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Send a command to a vacuum cleaner."""
|
||||
if self.supported_features & VacuumEntityFeature.SEND_COMMAND == 0:
|
||||
if (
|
||||
self._send_command_topic is None
|
||||
or self.supported_features & VacuumEntityFeature.SEND_COMMAND == 0
|
||||
):
|
||||
return
|
||||
if params:
|
||||
message = {"command": command}
|
||||
if isinstance(params, dict):
|
||||
message: dict[str, Any] = {"command": command}
|
||||
message.update(params)
|
||||
message = json_dumps(message)
|
||||
payload = json_dumps(message)
|
||||
else:
|
||||
message = command
|
||||
payload = command
|
||||
await self.async_publish(
|
||||
self._send_command_topic,
|
||||
message,
|
||||
payload,
|
||||
self._config[CONF_QOS],
|
||||
self._config[CONF_RETAIN],
|
||||
self._config[CONF_ENCODING],
|
||||
|
|
Loading…
Add table
Reference in a new issue