Xiaomi vacuum as platform of new vacuum component derived from ToggleEntity, and services (#8623)

* Xiaomi vacuum as component with switch, sensors and services

- Conversion from switch platform to async component.
- Add services proposed in #8416 to the new component, with shorter names.
- Add sensors for the vacuum robot as a selectable list from `battery`, `state`, `error`, `fanspeed`, `clean_time` and `clean_area` (the state attributes of the switch). The sensors don't poll, but listen to a signal to update the state, the switch fires this signal when updating.
- Assign default icons to sensors and the switch (`mdi:google-circles-group` looks like the robot!)

* path change in requirements_all (from switch platform to component)

* copy pasting is a bad habit

* services to the components services.yaml, modify .coveragerc

* review: use with multiple hosts, fix calls to async_add_devices, fix ranges for services

* `icon_for_battery_level` util method

* Xiaomi vacuum as platform of new component vacuum

- Created new component `vacuum` from a ToggleEntity.
- Add services `turn_on`, `turn_off`, `cleaning_play_pause`, `stop`, `return_to_base`, `locate`, `set_fanspeed` and `send_command`.
- Remove the main switch for the xiaomi vacuum (the toggable main entity is the switch).
- Add `support flags` for the common services
- Assign default icons to sensors and the switch (`mdi:google-circles-group` looks like the robot!)
- Move services descriptions to a yaml file for the new component.
- Update requirements_all.
- Update coveragerc.

* fix coveragerc

* fix battery icon helper to use more icons

* remove sensors, create properties and support flags for custom UI

* cleaning

* updated state_attrs for filtering in UI, renamed platform to simply `xiaomi`

* fix platform rename

* change fanspeed and expose `fanspeed_list` to use speed steps

* minor fixes

- Rename service `start_pause`
- Add 'Error' attribute only if `got_error`.
- Minor changes

* rename state attrs

* rename state attrs

* review changes: cut fan__speed, style changes, remove logging, and more

* add ATTR_COMMAND = 'command' to const

* pop entity_id from service data

* remove property accessor for vacuum object

* lint fix

* fix extra attrs names

* module level functions for calling the services

* params as optional keyword for `send_command`

* params as optional keyword for `send_command`, remove debug logs

* explicit parameters for `set_fan_speed` and `send_command`

* Demo platform for the vacuum component

* vacuum tests for the Demo platform

* some fixes

* don't omit vacuum

* vacuum tests for the Xiaomi platform

* fix test

* fix

* fix xiaomi test

* fix coveragerc

* test send command

* fix coveragerc

* fix string formatting

* The coverage is to low. It need 93% or more
This commit is contained in:
Eugenio Panadero 2017-08-04 15:27:10 +02:00 committed by Pascal Vizeli
parent 5b4e30cde3
commit 96f8c37dcd
15 changed files with 1495 additions and 131 deletions

View file

@ -535,7 +535,6 @@ omit =
homeassistant/components/switch/tplink.py
homeassistant/components/switch/transmission.py
homeassistant/components/switch/wake_on_lan.py
homeassistant/components/switch/xiaomi_vacuum.py
homeassistant/components/telegram_bot/*
homeassistant/components/thingspeak.py
homeassistant/components/tts/amazon_polly.py

View file

@ -66,7 +66,6 @@ ATTR_TYPE_ID = 'type_id'
ATTR_VENDOR_NAME = 'vendor_name'
ATTR_VENDOR_ID = 'vendor_id'
ATTR_DEVICE = 'device'
ATTR_COMMAND = 'command'
ATTR_TYPE = 'type'
ATTR_KEY = 'key'
ATTR_DUR = 'dur'

View file

@ -1,124 +0,0 @@
"""
Support for Xiaomi Vacuum cleaner robot.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/switch.xiaomi_vacuum/
"""
import logging
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.switch import SwitchDevice, PLATFORM_SCHEMA
from homeassistant.const import (DEVICE_DEFAULT_NAME,
CONF_NAME, CONF_HOST, CONF_TOKEN)
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_TOKEN): vol.All(str, vol.Length(min=32, max=32)),
vol.Optional(CONF_NAME): cv.string,
})
REQUIREMENTS = ['python-mirobo==0.1.2']
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
"""Set up the vacuum from config."""
host = config.get(CONF_HOST)
name = config.get(CONF_NAME)
token = config.get(CONF_TOKEN)
add_devices_callback([MiroboSwitch(name, host, token)], True)
class MiroboSwitch(SwitchDevice):
"""Representation of a Xiaomi Vacuum."""
def __init__(self, name, host, token):
"""Initialize the vacuum switch."""
self._name = name or DEVICE_DEFAULT_NAME
self._icon = 'mdi:broom'
self.host = host
self.token = token
self._vacuum = None
self._state = None
self._state_attrs = {}
self._is_on = False
@property
def name(self):
"""Return the name of the device if any."""
return self._name
@property
def icon(self):
"""Return the icon to use for device if any."""
return self._icon
@property
def available(self):
"""Return true when state is known."""
return self._state is not None
@property
def device_state_attributes(self):
"""Return the state attributes of the device."""
return self._state_attrs
@property
def is_on(self):
"""Return true if switch is on."""
return self._is_on
@property
def vacuum(self):
"""Property accessor for vacuum object."""
if not self._vacuum:
from mirobo import Vacuum
_LOGGER.info("initializing with host %s token %s",
self.host, self.token)
self._vacuum = Vacuum(self.host, self.token)
return self._vacuum
def turn_on(self, **kwargs):
"""Turn the vacuum on."""
from mirobo import VacuumException
try:
self.vacuum.start()
self._is_on = True
except VacuumException as ex:
_LOGGER.error("Unable to start the vacuum: %s", ex)
def turn_off(self, **kwargs):
"""Turn the vacuum off and return to home."""
from mirobo import VacuumException
try:
self.vacuum.stop()
self.vacuum.home()
self._is_on = False
except VacuumException as ex:
_LOGGER.error("Unable to turn off and return home: %s", ex)
def update(self):
"""Fetch state from the device."""
from mirobo import DeviceException
try:
state = self.vacuum.status()
_LOGGER.debug("got state from the vacuum: %s", state)
self._state_attrs = {
'Status': state.state, 'Error': state.error,
'Battery': state.battery, 'Fan': state.fanspeed,
'Cleaning time': str(state.clean_time),
'Cleaned area': state.clean_area}
self._state = state.state_code
self._is_on = state.is_on
except DeviceException as ex:
_LOGGER.error("Got exception while fetching the state: %s", ex)

View file

@ -15,11 +15,11 @@ from requests.auth import HTTPBasicAuth, HTTPDigestAuth
import voluptuous as vol
from homeassistant.components.notify import (
ATTR_MESSAGE, ATTR_TITLE, ATTR_DATA)
ATTR_DATA, ATTR_MESSAGE, ATTR_TITLE)
from homeassistant.config import load_yaml_config_file
from homeassistant.const import (
CONF_PLATFORM, CONF_API_KEY, CONF_TIMEOUT, ATTR_LATITUDE, ATTR_LONGITUDE,
HTTP_DIGEST_AUTHENTICATION)
ATTR_COMMAND, ATTR_LATITUDE, ATTR_LONGITUDE, CONF_API_KEY,
CONF_PLATFORM, CONF_TIMEOUT, HTTP_DIGEST_AUTHENTICATION)
import homeassistant.helpers.config_validation as cv
from homeassistant.exceptions import TemplateError
from homeassistant.setup import async_prepare_setup_platform
@ -35,7 +35,6 @@ ATTR_CALLBACK_QUERY_ID = 'callback_query_id'
ATTR_CAPTION = 'caption'
ATTR_CHAT_ID = 'chat_id'
ATTR_CHAT_INSTANCE = 'chat_instance'
ATTR_COMMAND = 'command'
ATTR_DISABLE_NOTIF = 'disable_notification'
ATTR_DISABLE_WEB_PREV = 'disable_web_page_preview'
ATTR_EDITED_MSG = 'edited_message'

View file

@ -0,0 +1,364 @@
"""
Support for vacuum cleaner robots (botvacs).
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/vacuum/
"""
import asyncio
from datetime import timedelta
from functools import partial
import logging
import os
import voluptuous as vol
from homeassistant.components import group
from homeassistant.config import load_yaml_config_file
from homeassistant.const import (
ATTR_BATTERY_LEVEL, ATTR_COMMAND, ATTR_ENTITY_ID, SERVICE_TOGGLE,
SERVICE_TURN_OFF, SERVICE_TURN_ON, STATE_ON)
from homeassistant.loader import bind_hass
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.config_validation import PLATFORM_SCHEMA # noqa
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.util.icon import icon_for_battery_level
_LOGGER = logging.getLogger(__name__)
DOMAIN = 'vacuum'
DEPENDENCIES = ['group']
SCAN_INTERVAL = timedelta(seconds=20)
GROUP_NAME_ALL_VACUUMS = 'all vacuum cleaners'
ENTITY_ID_ALL_VACUUMS = group.ENTITY_ID_FORMAT.format('all_vacuum_cleaners')
ATTR_BATTERY_ICON = 'battery_icon'
ATTR_CLEANED_AREA = 'cleaned_area'
ATTR_FAN_SPEED = 'fan_speed'
ATTR_FAN_SPEED_LIST = 'fan_speed_list'
ATTR_PARAMS = 'params'
ATTR_STATUS = 'status'
SERVICE_LOCATE = 'locate'
SERVICE_RETURN_TO_BASE = 'return_to_base'
SERVICE_SEND_COMMAND = 'send_command'
SERVICE_SET_FAN_SPEED = 'set_fan_speed'
SERVICE_START_PAUSE = 'start_pause'
SERVICE_STOP = 'stop'
VACUUM_SERVICE_SCHEMA = vol.Schema({
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
})
VACUUM_SET_FAN_SPEED_SERVICE_SCHEMA = VACUUM_SERVICE_SCHEMA.extend({
vol.Required(ATTR_FAN_SPEED): cv.string,
})
VACUUM_SEND_COMMAND_SERVICE_SCHEMA = VACUUM_SERVICE_SCHEMA.extend({
vol.Required(ATTR_COMMAND): cv.string,
vol.Optional(ATTR_PARAMS): cv.Dict,
})
SERVICE_TO_METHOD = {
SERVICE_TURN_ON: {'method': 'async_turn_on'},
SERVICE_TURN_OFF: {'method': 'async_turn_off'},
SERVICE_TOGGLE: {'method': 'async_toggle'},
SERVICE_START_PAUSE: {'method': 'async_start_pause'},
SERVICE_RETURN_TO_BASE: {'method': 'async_return_to_base'},
SERVICE_LOCATE: {'method': 'async_locate'},
SERVICE_STOP: {'method': 'async_stop'},
SERVICE_SET_FAN_SPEED: {'method': 'async_set_fan_speed',
'schema': VACUUM_SET_FAN_SPEED_SERVICE_SCHEMA},
SERVICE_SEND_COMMAND: {'method': 'async_send_command',
'schema': VACUUM_SEND_COMMAND_SERVICE_SCHEMA},
}
DEFAULT_NAME = 'Vacuum cleaner robot'
DEFAULT_ICON = 'mdi:google-circles-group'
SUPPORT_TURN_ON = 1
SUPPORT_TURN_OFF = 2
SUPPORT_PAUSE = 4
SUPPORT_STOP = 8
SUPPORT_RETURN_HOME = 16
SUPPORT_FAN_SPEED = 32
SUPPORT_BATTERY = 64
SUPPORT_STATUS = 128
SUPPORT_SEND_COMMAND = 256
SUPPORT_LOCATE = 512
SUPPORT_MAP = 1024
@bind_hass
def is_on(hass, entity_id=None):
"""Return if the vacuum is on based on the statemachine."""
entity_id = entity_id or ENTITY_ID_ALL_VACUUMS
return hass.states.is_state(entity_id, STATE_ON)
@bind_hass
def turn_on(hass, entity_id=None):
"""Turn all or specified vacuum on."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_TURN_ON, data)
@bind_hass
def turn_off(hass, entity_id=None):
"""Turn all or specified vacuum off."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_TURN_OFF, data)
@bind_hass
def toggle(hass, entity_id=None):
"""Toggle all or specified vacuum."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_TOGGLE, data)
@bind_hass
def locate(hass, entity_id=None):
"""Locate all or specified vacuum."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_LOCATE, data)
@bind_hass
def return_to_base(hass, entity_id=None):
"""Tell all or specified vacuum to return to base."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_RETURN_TO_BASE, data)
@bind_hass
def start_pause(hass, entity_id=None):
"""Tell all or specified vacuum to start or pause the current task."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_START_PAUSE, data)
@bind_hass
def stop(hass, entity_id=None):
"""Stop all or specified vacuum."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_STOP, data)
@bind_hass
def set_fan_speed(hass, fan_speed, entity_id=None):
"""Set fan speed for all or specified vacuum."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
data[ATTR_FAN_SPEED] = fan_speed
hass.services.call(DOMAIN, SERVICE_SET_FAN_SPEED, data)
@bind_hass
def send_command(hass, command, params=None, entity_id=None):
"""Send command to all or specified vacuum."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
data[ATTR_COMMAND] = command
if params is not None:
data[ATTR_PARAMS] = params
hass.services.call(DOMAIN, SERVICE_SEND_COMMAND, data)
@asyncio.coroutine
def async_setup(hass, config):
"""Set up the vacuum component."""
if not config[DOMAIN]:
return False
component = EntityComponent(
_LOGGER, DOMAIN, hass, SCAN_INTERVAL, GROUP_NAME_ALL_VACUUMS)
yield from component.async_setup(config)
descriptions = yield from hass.async_add_job(
load_yaml_config_file, os.path.join(
os.path.dirname(__file__), 'services.yaml'))
@asyncio.coroutine
def async_handle_vacuum_service(service):
"""Map services to methods on VacuumDevice."""
method = SERVICE_TO_METHOD.get(service.service)
if not method:
return
target_vacuums = component.async_extract_from_service(service)
params = service.data.copy()
params.pop(ATTR_ENTITY_ID, None)
update_tasks = []
for vacuum in target_vacuums:
yield from getattr(vacuum, method['method'])(**params)
if not vacuum.should_poll:
continue
update_coro = hass.async_add_job(
vacuum.async_update_ha_state(True))
if hasattr(vacuum, 'async_update'):
update_tasks.append(update_coro)
else:
yield from update_coro
if update_tasks:
yield from asyncio.wait(update_tasks, loop=hass.loop)
for service in SERVICE_TO_METHOD:
schema = SERVICE_TO_METHOD[service].get(
'schema', VACUUM_SERVICE_SCHEMA)
hass.services.async_register(
DOMAIN, service, async_handle_vacuum_service,
descriptions.get(service), schema=schema)
return True
class VacuumDevice(ToggleEntity):
"""Representation of a vacuum cleaner robot."""
@property
def supported_features(self):
"""Flag vacuum cleaner features that are supported."""
return 0
@property
def status(self):
"""Return the status of the vacuum cleaner."""
return None
@property
def battery_level(self):
"""Return the battery level of the vacuum cleaner."""
return None
@property
def battery_icon(self):
"""Return the battery icon for the vacuum cleaner."""
charging = False
if self.status is not None:
charging = 'charg' in self.status.lower()
return icon_for_battery_level(
battery_level=self.battery_level, charging=charging)
@property
def fan_speed(self):
"""Return the fan speed of the vacuum cleaner."""
return None
@property
def fan_speed_list(self) -> list:
"""Get the list of available fan speed steps of the vacuum cleaner."""
return []
@property
def state_attributes(self):
"""Return the state attributes of the vacuum cleaner."""
data = {}
if self.status is not None:
data[ATTR_STATUS] = self.status
if self.battery_level is not None:
data[ATTR_BATTERY_LEVEL] = self.battery_level
data[ATTR_BATTERY_ICON] = self.battery_icon
if self.fan_speed is not None:
data[ATTR_FAN_SPEED] = self.fan_speed
data[ATTR_FAN_SPEED_LIST] = self.fan_speed_list
return data
def turn_on(self, **kwargs):
"""Turn the vacuum on and start cleaning."""
raise NotImplementedError()
def async_turn_on(self, **kwargs):
"""Turn the vacuum on and start cleaning.
This method must be run in the event loop and returns a coroutine.
"""
return self.hass.async_add_job(partial(self.turn_on, **kwargs))
def turn_off(self, **kwargs):
"""Turn the vacuum off stopping the cleaning and returning home."""
raise NotImplementedError()
def async_turn_off(self, **kwargs):
"""Turn the vacuum off stopping the cleaning and returning home.
This method must be run in the event loop and returns a coroutine.
"""
return self.hass.async_add_job(partial(self.turn_off, **kwargs))
def return_to_base(self, **kwargs):
"""Set the vacuum cleaner to return to the dock."""
raise NotImplementedError()
def async_return_to_base(self, **kwargs):
"""Set the vacuum cleaner to return to the dock.
This method must be run in the event loop and returns a coroutine.
"""
return self.hass.async_add_job(partial(self.return_to_base, **kwargs))
def stop(self, **kwargs):
"""Stop the vacuum cleaner."""
raise NotImplementedError()
def async_stop(self, **kwargs):
"""Stop the vacuum cleaner.
This method must be run in the event loop and returns a coroutine.
"""
return self.hass.async_add_job(partial(self.stop, **kwargs))
def locate(self, **kwargs):
"""Locate the vacuum cleaner."""
raise NotImplementedError()
def async_locate(self, **kwargs):
"""Locate the vacuum cleaner.
This method must be run in the event loop and returns a coroutine.
"""
return self.hass.async_add_job(partial(self.locate, **kwargs))
def set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed."""
raise NotImplementedError()
def async_set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed.
This method must be run in the event loop and returns a coroutine.
"""
return self.hass.async_add_job(
partial(self.set_fan_speed, fan_speed, **kwargs))
def start_pause(self, **kwargs):
"""Start, pause or resume the cleaning task."""
raise NotImplementedError()
def async_start_pause(self, **kwargs):
"""Start, pause or resume the cleaning task.
This method must be run in the event loop and returns a coroutine.
"""
return self.hass.async_add_job(
partial(self.start_pause, **kwargs))
def send_command(self, command, params=None, **kwargs):
"""Send a command to a vacuum cleaner."""
raise NotImplementedError()
def async_send_command(self, command, params=None, **kwargs):
"""Send a command to a vacuum cleaner.
This method must be run in the event loop and returns a coroutine.
"""
return self.hass.async_add_job(
partial(self.send_command, command, params=params, **kwargs))

View file

@ -0,0 +1,203 @@
"""
Demo platform for the vacuum component.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/demo/
"""
import logging
from homeassistant.components.vacuum import (
ATTR_CLEANED_AREA, DEFAULT_ICON, SUPPORT_BATTERY, SUPPORT_FAN_SPEED,
SUPPORT_LOCATE, SUPPORT_PAUSE, SUPPORT_RETURN_HOME, SUPPORT_SEND_COMMAND,
SUPPORT_STATUS, SUPPORT_STOP, SUPPORT_TURN_OFF, SUPPORT_TURN_ON,
VacuumDevice)
_LOGGER = logging.getLogger(__name__)
SUPPORT_MINIMAL_SERVICES = SUPPORT_TURN_ON | SUPPORT_TURN_OFF
SUPPORT_BASIC_SERVICES = SUPPORT_TURN_ON | SUPPORT_TURN_OFF | \
SUPPORT_STATUS | SUPPORT_BATTERY
SUPPORT_MOST_SERVICES = SUPPORT_TURN_ON | SUPPORT_TURN_OFF | SUPPORT_STOP | \
SUPPORT_RETURN_HOME | SUPPORT_STATUS | SUPPORT_BATTERY
SUPPORT_ALL_SERVICES = SUPPORT_TURN_ON | SUPPORT_TURN_OFF | SUPPORT_PAUSE | \
SUPPORT_STOP | SUPPORT_RETURN_HOME | \
SUPPORT_FAN_SPEED | SUPPORT_SEND_COMMAND | \
SUPPORT_LOCATE | SUPPORT_STATUS | SUPPORT_BATTERY
FAN_SPEEDS = ['min', 'medium', 'high', 'max']
DEMO_VACUUM_COMPLETE = '0_Ground_floor'
DEMO_VACUUM_MOST = '1_First_floor'
DEMO_VACUUM_BASIC = '2_Second_floor'
DEMO_VACUUM_MINIMAL = '3_Third_floor'
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Demo vacuums."""
add_devices([
DemoVacuum(DEMO_VACUUM_COMPLETE, SUPPORT_ALL_SERVICES),
DemoVacuum(DEMO_VACUUM_MOST, SUPPORT_MOST_SERVICES),
DemoVacuum(DEMO_VACUUM_BASIC, SUPPORT_BASIC_SERVICES),
DemoVacuum(DEMO_VACUUM_MINIMAL, SUPPORT_MINIMAL_SERVICES),
])
class DemoVacuum(VacuumDevice):
"""Representation of a demo vacuum."""
# pylint: disable=no-self-use
def __init__(self, name, supported_features=None):
"""Initialize the vacuum."""
self._name = name
self._supported_features = supported_features
self._state = False
self._status = 'Charging'
self._fan_speed = FAN_SPEEDS[1]
self._cleaned_area = 0
self._battery_level = 100
@property
def name(self):
"""Return the name of the vacuum."""
return self._name
@property
def icon(self):
"""Return the icon for the vacuum."""
return DEFAULT_ICON
@property
def should_poll(self):
"""No polling needed for a demo vacuum."""
return False
@property
def is_on(self):
"""Return true if vacuum is on."""
return self._state
@property
def status(self):
"""Return the status of the vacuum."""
if self.supported_features & SUPPORT_STATUS == 0:
return
return self._status
@property
def fan_speed(self):
"""Return the status of the vacuum."""
if self.supported_features & SUPPORT_FAN_SPEED == 0:
return
return self._fan_speed
@property
def fan_speed_list(self):
"""Return the status of the vacuum."""
if self.supported_features & SUPPORT_FAN_SPEED == 0:
return
return FAN_SPEEDS
@property
def battery_level(self):
"""Return the status of the vacuum."""
if self.supported_features & SUPPORT_BATTERY == 0:
return
return max(0, min(100, self._battery_level))
@property
def device_state_attributes(self):
"""Return device state attributes."""
return {ATTR_CLEANED_AREA: round(self._cleaned_area, 2)}
@property
def supported_features(self):
"""Flag supported features."""
if self._supported_features is not None:
return self._supported_features
return super().supported_features
def turn_on(self, **kwargs):
"""Turn the vacuum on."""
if self.supported_features & SUPPORT_TURN_ON == 0:
return
self._state = True
self._cleaned_area += 5.32
self._battery_level -= 2
self._status = 'Cleaning'
self.schedule_update_ha_state()
def turn_off(self, **kwargs):
"""Turn the vacuum off."""
if self.supported_features & SUPPORT_TURN_OFF == 0:
return
self._state = False
self._status = 'Charging'
self.schedule_update_ha_state()
def stop(self, **kwargs):
"""Turn the vacuum off."""
if self.supported_features & SUPPORT_STOP == 0:
return
self._state = False
self._status = 'Stopping the current task'
self.schedule_update_ha_state()
def locate(self, **kwargs):
"""Turn the vacuum off."""
if self.supported_features & SUPPORT_LOCATE == 0:
return
self._status = "Hi, I'm over here!"
self.schedule_update_ha_state()
def start_pause(self, **kwargs):
"""Start, pause or resume the cleaning task."""
if self.supported_features & SUPPORT_PAUSE == 0:
return
self._state = not self._state
if self._state:
self._status = 'Resuming the current task'
self._cleaned_area += 1.32
self._battery_level -= 1
else:
self._status = 'Pausing the current task'
self.schedule_update_ha_state()
def set_fan_speed(self, fan_speed, **kwargs):
"""Tell the vacuum to return to its dock."""
if self.supported_features & SUPPORT_FAN_SPEED == 0:
return
if fan_speed in self.fan_speed_list:
self._fan_speed = fan_speed
self.schedule_update_ha_state()
def return_to_base(self, **kwargs):
"""Tell the vacuum to return to its dock."""
if self.supported_features & SUPPORT_RETURN_HOME == 0:
return
self._state = False
self._status = 'Returning home...'
self._battery_level += 5
self.schedule_update_ha_state()
def send_command(self, command, params=None, **kwargs):
"""Send a command to the vacuum."""
if self.supported_features & SUPPORT_SEND_COMMAND == 0:
return
self._status = 'Executing {}({})'.format(command, params)
self._state = True
self.schedule_update_ha_state()

View file

@ -0,0 +1,131 @@
turn_on:
description: Start a new cleaning task.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
turn_off:
description: Stop the current cleaning task and return to home.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
stop:
description: Stop the current cleaning task.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
locate:
description: Locate the vacuum cleaner robot.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
start_pause:
description: Start, pause, or resume the cleaning task.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
return_to_base:
description: Tell the vacuum cleaner to return to its dock.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
send_command:
description: Send a raw command to the vacuum cleaner.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
command:
description: Command to execute.
example: 'set_dnd_timer'
params:
description: Parameters for the command.
example: '[22,0,6,0]'
set_fan_speed:
description: Set the fan speed of the vacuum cleaner.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
fan_speed:
description: Platform dependent vacuum cleaner fan speed, with speed steps, like 'medium', or by percentage, between 0 and 100.
example: 'low'
xiaomi_remote_control_start:
description: Start remote control of the vacuum cleaner. You can then move it with `remote_control_move`, when done call `remote_control_stop`.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
xiaomi_remote_control_stop:
description: Stop remote control mode of the vacuum cleaner.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
xiaomi_remote_control_move:
description: Remote control the vacuum cleaner, make sure you first set it in remote control mode with `remote_control_start`.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
velocity:
description: Speed, between -0.29 and 0.29.
example: '0.2'
rotation:
description: Rotation, between -179 degrees and 179 degrees.
example: '90'
duration:
description: Duration of the movement?
example: '1500'
xiaomi_remote_control_move_step:
description: Remote control the vacuum cleaner, only makes one move and then stops.
fields:
entity_id:
description: Name of the botvac entity.
example: 'vacuum.xiaomi_vacuum_cleaner'
velocity:
description: Speed, between -0.29 and 0.29.
example: '0.2'
rotation:
description: Rotation, between -179 degrees and 179 degrees.
example: '90'
duration:
description: Duration of the movement?
example: '1500'

View file

@ -0,0 +1,354 @@
"""
Support for the Xiaomi vacuum cleaner robot.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/vacuum.xiaomi/
"""
import asyncio
from functools import partial
import logging
import os
import voluptuous as vol
from homeassistant.components.vacuum import (
ATTR_CLEANED_AREA, DEFAULT_ICON, DOMAIN, PLATFORM_SCHEMA,
SUPPORT_BATTERY, SUPPORT_FAN_SPEED, SUPPORT_LOCATE, SUPPORT_PAUSE,
SUPPORT_RETURN_HOME, SUPPORT_SEND_COMMAND,
SUPPORT_STATUS, SUPPORT_STOP, SUPPORT_TURN_OFF, SUPPORT_TURN_ON,
VACUUM_SERVICE_SCHEMA, VacuumDevice)
from homeassistant.config import load_yaml_config_file
from homeassistant.const import (
ATTR_ENTITY_ID, CONF_HOST, CONF_NAME, CONF_TOKEN, STATE_OFF, STATE_ON)
import homeassistant.helpers.config_validation as cv
REQUIREMENTS = ['python-mirobo==0.1.2']
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = 'Xiaomi Vacuum cleaner'
ICON = DEFAULT_ICON
PLATFORM = 'xiaomi'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_TOKEN): vol.All(str, vol.Length(min=32, max=32)),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
}, extra=vol.ALLOW_EXTRA)
SERVICE_MOVE_REMOTE_CONTROL = 'xiaomi_remote_control_move'
SERVICE_MOVE_REMOTE_CONTROL_STEP = 'xiaomi_remote_control_move_step'
SERVICE_START_REMOTE_CONTROL = 'xiaomi_remote_control_start'
SERVICE_STOP_REMOTE_CONTROL = 'xiaomi_remote_control_stop'
FAN_SPEEDS = {
'Quiet': 38,
'Balanced': 60,
'Turbo': 77,
'Max': 90}
ATTR_CLEANING_TIME = 'cleaning_time'
ATTR_DO_NOT_DISTURB = 'do_not_disturb'
ATTR_ERROR = 'error'
ATTR_RC_DURATION = 'duration'
ATTR_RC_ROTATION = 'rotation'
ATTR_RC_VELOCITY = 'velocity'
SERVICE_SCHEMA_REMOTE_CONTROL = VACUUM_SERVICE_SCHEMA.extend({
vol.Optional(ATTR_RC_VELOCITY):
vol.All(vol.Coerce(float), vol.Clamp(min=-0.29, max=0.29)),
vol.Optional(ATTR_RC_ROTATION):
vol.All(vol.Coerce(int), vol.Clamp(min=-179, max=179)),
vol.Optional(ATTR_RC_DURATION): cv.positive_int,
})
SERVICE_TO_METHOD = {
SERVICE_START_REMOTE_CONTROL: {'method': 'async_remote_control_start'},
SERVICE_STOP_REMOTE_CONTROL: {'method': 'async_remote_control_stop'},
SERVICE_MOVE_REMOTE_CONTROL: {
'method': 'async_remote_control_move',
'schema': SERVICE_SCHEMA_REMOTE_CONTROL},
SERVICE_MOVE_REMOTE_CONTROL_STEP: {
'method': 'async_remote_control_move_step',
'schema': SERVICE_SCHEMA_REMOTE_CONTROL},
}
SUPPORT_XIAOMI = SUPPORT_TURN_ON | SUPPORT_TURN_OFF | SUPPORT_PAUSE | \
SUPPORT_STOP | SUPPORT_RETURN_HOME | SUPPORT_FAN_SPEED | \
SUPPORT_SEND_COMMAND | SUPPORT_LOCATE | \
SUPPORT_STATUS | SUPPORT_BATTERY
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Set up the Xiaomi vacuum cleaner robot platform."""
from mirobo import Vacuum
if PLATFORM not in hass.data:
hass.data[PLATFORM] = {}
host = config.get(CONF_HOST)
name = config.get(CONF_NAME)
token = config.get(CONF_TOKEN)
# Create handler
_LOGGER.info("Initializing with host %s (token %s...)", host, token[:5])
vacuum = Vacuum(host, token)
mirobo = MiroboVacuum(name, vacuum)
hass.data[PLATFORM][host] = mirobo
async_add_devices([mirobo], update_before_add=True)
@asyncio.coroutine
def async_service_handler(service):
"""Map services to methods on MiroboVacuum."""
method = SERVICE_TO_METHOD.get(service.service)
if not method:
return
params = {key: value for key, value in service.data.items()
if key != ATTR_ENTITY_ID}
entity_ids = service.data.get(ATTR_ENTITY_ID)
if entity_ids:
target_vacuums = [vac for vac in hass.data[PLATFORM].values()
if vac.entity_id in entity_ids]
else:
target_vacuums = hass.data[PLATFORM].values()
update_tasks = []
for vacuum in target_vacuums:
yield from getattr(vacuum, method['method'])(**params)
for vacuum in target_vacuums:
update_coro = vacuum.async_update_ha_state(True)
update_tasks.append(update_coro)
if update_tasks:
yield from asyncio.wait(update_tasks, loop=hass.loop)
descriptions = yield from hass.async_add_job(
load_yaml_config_file, os.path.join(
os.path.dirname(__file__), 'services.yaml'))
for vacuum_service in SERVICE_TO_METHOD:
schema = SERVICE_TO_METHOD[vacuum_service].get(
'schema', VACUUM_SERVICE_SCHEMA)
hass.services.async_register(
DOMAIN, vacuum_service, async_service_handler,
description=descriptions.get(vacuum_service), schema=schema)
class MiroboVacuum(VacuumDevice):
"""Representation of a Xiaomi Vacuum cleaner robot."""
def __init__(self, name, vacuum):
"""Initialize the Xiaomi vacuum cleaner robot handler."""
self._name = name
self._icon = ICON
self._vacuum = vacuum
self.vacuum_state = None
self._is_on = False
self._available = False
@property
def name(self):
"""Return the name of the device."""
return self._name
@property
def icon(self):
"""Return the icon to use for device."""
return self._icon
@property
def status(self):
"""Return the status of the vacuum cleaner."""
if self.vacuum_state is not None:
return self.vacuum_state.state
@property
def battery_level(self):
"""Return the battery level of the vacuum cleaner."""
if self.vacuum_state is not None:
return self.vacuum_state.battery
@property
def fan_speed(self):
"""Return the fan speed of the vacuum cleaner."""
if self.vacuum_state is not None:
speed = self.vacuum_state.fanspeed
if speed in FAN_SPEEDS.values():
return [key for key, value in FAN_SPEEDS.items()
if value == speed][0]
return speed
@property
def fan_speed_list(self):
"""Get the list of available fan speed steps of the vacuum cleaner."""
return list(sorted(FAN_SPEEDS.keys(), key=lambda s: FAN_SPEEDS[s]))
@property
def device_state_attributes(self):
"""Return the specific state attributes of this vacuum cleaner."""
if self.vacuum_state is not None:
attrs = {
ATTR_DO_NOT_DISTURB:
STATE_ON if self.vacuum_state.dnd else STATE_OFF,
# Not working --> 'Cleaning mode':
# STATE_ON if self.vacuum_state.in_cleaning else STATE_OFF,
ATTR_CLEANING_TIME: str(self.vacuum_state.clean_time),
ATTR_CLEANED_AREA: round(self.vacuum_state.clean_area, 2)}
if self.vacuum_state.got_error:
attrs[ATTR_ERROR] = self.vacuum_state.error
return attrs
return {}
@property
def is_on(self) -> bool:
"""Return True if entity is on."""
return self._is_on
@property
def available(self) -> bool:
"""Return True if entity is available."""
return self._available
@property
def supported_features(self):
"""Flag vacuum cleaner robot features that are supported."""
return SUPPORT_XIAOMI
@asyncio.coroutine
def _try_command(self, mask_error, func, *args, **kwargs):
"""Call a vacuum command handling error messages."""
from mirobo import VacuumException
try:
yield from self.hass.async_add_job(partial(func, *args, **kwargs))
return True
except VacuumException as ex:
_LOGGER.error(mask_error, ex)
return False
@asyncio.coroutine
def async_turn_on(self, **kwargs):
"""Turn the vacuum on."""
is_on = yield from self._try_command(
"Unable to start the vacuum: %s", self._vacuum.start)
self._is_on = is_on
@asyncio.coroutine
def async_turn_off(self, **kwargs):
"""Turn the vacuum off and return to home."""
yield from self.async_stop()
return_home = yield from self.async_return_to_base()
if return_home:
self._is_on = False
@asyncio.coroutine
def async_stop(self, **kwargs):
"""Stop the vacuum cleaner."""
yield from self._try_command(
"Unable to stop: %s", self._vacuum.stop)
@asyncio.coroutine
def async_set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed."""
if fan_speed.capitalize() in FAN_SPEEDS:
fan_speed = FAN_SPEEDS[fan_speed.capitalize()]
else:
try:
fan_speed = int(fan_speed)
except ValueError as exc:
_LOGGER.error("Fan speed step not recognized (%s). "
"Valid speeds are: %s", exc,
self.fan_speed_list)
return
yield from self._try_command(
"Unable to set fan speed: %s",
self._vacuum.set_fan_speed, fan_speed)
@asyncio.coroutine
def async_start_pause(self, **kwargs):
"""Start, pause or resume the cleaning task."""
if self.vacuum_state and self.is_on:
yield from self._try_command(
"Unable to set start/pause: %s", self._vacuum.pause)
else:
yield from self.async_turn_on()
@asyncio.coroutine
def async_return_to_base(self, **kwargs):
"""Set the vacuum cleaner to return to the dock."""
return_home = yield from self._try_command(
"Unable to return home: %s", self._vacuum.home)
if return_home:
self._is_on = False
@asyncio.coroutine
def async_locate(self, **kwargs):
"""Locate the vacuum cleaner."""
yield from self._try_command(
"Unable to locate the botvac: %s", self._vacuum.find)
@asyncio.coroutine
def async_send_command(self, command, params=None, **kwargs):
"""Send raw command."""
yield from self._try_command(
"Unable to send command to the vacuum: %s",
self._vacuum.raw_command, command, params)
@asyncio.coroutine
def async_remote_control_start(self):
"""Start remote control mode."""
yield from self._try_command(
"Unable to start remote control the vacuum: %s",
self._vacuum.manual_start)
@asyncio.coroutine
def async_remote_control_stop(self):
"""Stop remote control mode."""
yield from self._try_command(
"Unable to stop remote control the vacuum: %s",
self._vacuum.manual_stop)
@asyncio.coroutine
def async_remote_control_move(self,
rotation: int=0,
velocity: float=0.3,
duration: int=1500):
"""Move vacuum with remote control mode."""
yield from self._try_command(
"Unable to move with remote control the vacuum: %s",
self._vacuum.manual_control,
velocity=velocity, rotation=rotation, duration=duration)
@asyncio.coroutine
def async_remote_control_move_step(self,
rotation: int=0,
velocity: float=0.2,
duration: int=1500):
"""Move vacuum one step with remote control mode."""
yield from self._try_command(
"Unable to remote control the vacuum: %s",
self._vacuum.manual_control_once,
velocity=velocity, rotation=rotation, duration=duration)
@asyncio.coroutine
def async_update(self):
"""Fetch state from the device."""
from mirobo import DeviceException
try:
state = yield from self.hass.async_add_job(self._vacuum.status)
_LOGGER.debug("Got new state from the vacuum: %s", state.data)
self.vacuum_state = state
self._is_on = state.is_on
self._available = True
except DeviceException as ex:
_LOGGER.warning("Got exception while fetching the state: %s", ex)
# self._available = False
except OSError as ex:
_LOGGER.error("Got exception while fetching the state: %s", ex)
# self._available = False

View file

@ -283,6 +283,9 @@ ATTR_WAKEUP = 'wake_up_interval'
ATTR_CODE = 'code'
ATTR_CODE_FORMAT = 'code_format'
# For calling a device specific command
ATTR_COMMAND = 'command'
# For devices which support an armed state
ATTR_ARMED = 'device_armed'

View file

@ -0,0 +1,18 @@
"""Icon util methods."""
from typing import Optional
def icon_for_battery_level(battery_level: Optional[int]=None,
charging: bool=False) -> str:
"""Return a battery icon valid identifier."""
icon = 'mdi:battery'
if battery_level is None:
return icon + '-unknown'
if charging and battery_level > 10:
icon += '-charging-{}'.format(
int(round(battery_level / 20 - .01)) * 20)
elif charging or battery_level <= 5:
icon += '-outline'
elif 5 < battery_level < 95:
icon += '-{}'.format(int(round(battery_level / 10 - .01)) * 10)
return icon

View file

@ -728,7 +728,7 @@ python-juicenet==0.0.5
# homeassistant.components.lirc
# python-lirc==1.2.3
# homeassistant.components.switch.xiaomi_vacuum
# homeassistant.components.vacuum.xiaomi
python-mirobo==0.1.2
# homeassistant.components.media_player.mpd

View file

@ -0,0 +1 @@
"""The tests for vacuum platforms."""

View file

@ -0,0 +1,210 @@
"""The tests for the Demo vacuum platform."""
import unittest
from homeassistant.components import vacuum
from homeassistant.components.vacuum import (
ATTR_BATTERY_LEVEL, ATTR_COMMAND, ATTR_ENTITY_ID, ATTR_FAN_SPEED,
ATTR_FAN_SPEED_LIST, ATTR_PARAMS, ATTR_STATUS, DOMAIN,
ENTITY_ID_ALL_VACUUMS,
SERVICE_SEND_COMMAND, SERVICE_SET_FAN_SPEED)
from homeassistant.components.vacuum.demo import (
DEMO_VACUUM_BASIC, DEMO_VACUUM_COMPLETE, DEMO_VACUUM_MINIMAL,
DEMO_VACUUM_MOST, FAN_SPEEDS)
from homeassistant.const import (
ATTR_SUPPORTED_FEATURES, CONF_PLATFORM, STATE_OFF, STATE_ON)
from homeassistant.setup import setup_component
from tests.common import get_test_home_assistant, mock_service
ENTITY_VACUUM_BASIC = '{}.{}'.format(DOMAIN, DEMO_VACUUM_BASIC).lower()
ENTITY_VACUUM_COMPLETE = '{}.{}'.format(DOMAIN, DEMO_VACUUM_COMPLETE).lower()
ENTITY_VACUUM_MINIMAL = '{}.{}'.format(DOMAIN, DEMO_VACUUM_MINIMAL).lower()
ENTITY_VACUUM_MOST = '{}.{}'.format(DOMAIN, DEMO_VACUUM_MOST).lower()
class TestVacuumDemo(unittest.TestCase):
"""Test the Demo vacuum."""
def setUp(self): # pylint: disable=invalid-name
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.assertTrue(setup_component(
self.hass, DOMAIN, {DOMAIN: {CONF_PLATFORM: 'demo'}}))
def tearDown(self): # pylint: disable=invalid-name
"""Stop down everything that was started."""
self.hass.stop()
def test_supported_features(self):
"""Test vacuum supported features."""
state = self.hass.states.get(ENTITY_VACUUM_COMPLETE)
self.assertEqual(1023, state.attributes.get(ATTR_SUPPORTED_FEATURES))
self.assertEqual("Charging", state.attributes.get(ATTR_STATUS))
self.assertEqual(100, state.attributes.get(ATTR_BATTERY_LEVEL))
self.assertEqual("medium", state.attributes.get(ATTR_FAN_SPEED))
self.assertListEqual(FAN_SPEEDS,
state.attributes.get(ATTR_FAN_SPEED_LIST))
self.assertEqual(STATE_OFF, state.state)
state = self.hass.states.get(ENTITY_VACUUM_MOST)
self.assertEqual(219, state.attributes.get(ATTR_SUPPORTED_FEATURES))
self.assertEqual("Charging", state.attributes.get(ATTR_STATUS))
self.assertEqual(100, state.attributes.get(ATTR_BATTERY_LEVEL))
self.assertEqual(None, state.attributes.get(ATTR_FAN_SPEED))
self.assertEqual(None, state.attributes.get(ATTR_FAN_SPEED_LIST))
self.assertEqual(STATE_OFF, state.state)
state = self.hass.states.get(ENTITY_VACUUM_BASIC)
self.assertEqual(195, state.attributes.get(ATTR_SUPPORTED_FEATURES))
self.assertEqual("Charging", state.attributes.get(ATTR_STATUS))
self.assertEqual(100, state.attributes.get(ATTR_BATTERY_LEVEL))
self.assertEqual(None, state.attributes.get(ATTR_FAN_SPEED))
self.assertEqual(None, state.attributes.get(ATTR_FAN_SPEED_LIST))
self.assertEqual(STATE_OFF, state.state)
state = self.hass.states.get(ENTITY_VACUUM_MINIMAL)
self.assertEqual(3, state.attributes.get(ATTR_SUPPORTED_FEATURES))
self.assertEqual(None, state.attributes.get(ATTR_STATUS))
self.assertEqual(None, state.attributes.get(ATTR_BATTERY_LEVEL))
self.assertEqual(None, state.attributes.get(ATTR_FAN_SPEED))
self.assertEqual(None, state.attributes.get(ATTR_FAN_SPEED_LIST))
self.assertEqual(STATE_OFF, state.state)
def test_methods(self):
"""Test if methods call the services as expected."""
self.hass.states.set(ENTITY_VACUUM_BASIC, STATE_ON)
self.assertTrue(vacuum.is_on(self.hass, ENTITY_VACUUM_BASIC))
self.hass.states.set(ENTITY_VACUUM_BASIC, STATE_OFF)
self.assertFalse(vacuum.is_on(self.hass, ENTITY_VACUUM_BASIC))
self.hass.states.set(ENTITY_ID_ALL_VACUUMS, STATE_ON)
self.assertTrue(vacuum.is_on(self.hass))
self.hass.states.set(ENTITY_ID_ALL_VACUUMS, STATE_OFF)
self.assertFalse(vacuum.is_on(self.hass))
vacuum.turn_on(self.hass, ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
self.assertTrue(vacuum.is_on(self.hass, ENTITY_VACUUM_COMPLETE))
vacuum.turn_off(self.hass, ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
self.assertFalse(vacuum.is_on(self.hass, ENTITY_VACUUM_COMPLETE))
vacuum.toggle(self.hass, ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
self.assertTrue(vacuum.is_on(self.hass, ENTITY_VACUUM_COMPLETE))
vacuum.start_pause(self.hass, ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
self.assertFalse(vacuum.is_on(self.hass, ENTITY_VACUUM_COMPLETE))
vacuum.start_pause(self.hass, ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
self.assertTrue(vacuum.is_on(self.hass, ENTITY_VACUUM_COMPLETE))
vacuum.stop(self.hass, ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
self.assertFalse(vacuum.is_on(self.hass, ENTITY_VACUUM_COMPLETE))
state = self.hass.states.get(ENTITY_VACUUM_COMPLETE)
self.assertLess(state.attributes.get(ATTR_BATTERY_LEVEL), 100)
self.assertNotEqual("Charging", state.attributes.get(ATTR_STATUS))
vacuum.locate(self.hass, ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
state = self.hass.states.get(ENTITY_VACUUM_COMPLETE)
self.assertIn("I'm over here", state.attributes.get(ATTR_STATUS))
vacuum.return_to_base(self.hass, ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
state = self.hass.states.get(ENTITY_VACUUM_COMPLETE)
self.assertIn("Returning home", state.attributes.get(ATTR_STATUS))
vacuum.set_fan_speed(self.hass, FAN_SPEEDS[-1],
entity_id=ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
state = self.hass.states.get(ENTITY_VACUUM_COMPLETE)
self.assertEqual(FAN_SPEEDS[-1], state.attributes.get(ATTR_FAN_SPEED))
def test_services(self):
"""Test vacuum services."""
# Test send_command
send_command_calls = mock_service(
self.hass, DOMAIN, SERVICE_SEND_COMMAND)
params = {"rotate": 150, "speed": 20}
vacuum.send_command(
self.hass, 'test_command', entity_id=ENTITY_VACUUM_BASIC,
params=params)
self.hass.block_till_done()
self.assertEqual(1, len(send_command_calls))
call = send_command_calls[-1]
self.assertEqual(DOMAIN, call.domain)
self.assertEqual(SERVICE_SEND_COMMAND, call.service)
self.assertEqual(ENTITY_VACUUM_BASIC, call.data[ATTR_ENTITY_ID])
self.assertEqual('test_command', call.data[ATTR_COMMAND])
self.assertEqual(params, call.data[ATTR_PARAMS])
# Test set fan speed
set_fan_speed_calls = mock_service(
self.hass, DOMAIN, SERVICE_SET_FAN_SPEED)
vacuum.set_fan_speed(
self.hass, FAN_SPEEDS[0], entity_id=ENTITY_VACUUM_COMPLETE)
self.hass.block_till_done()
self.assertEqual(1, len(set_fan_speed_calls))
call = set_fan_speed_calls[-1]
self.assertEqual(DOMAIN, call.domain)
self.assertEqual(SERVICE_SET_FAN_SPEED, call.service)
self.assertEqual(ENTITY_VACUUM_COMPLETE, call.data[ATTR_ENTITY_ID])
self.assertEqual(FAN_SPEEDS[0], call.data[ATTR_FAN_SPEED])
def test_set_fan_speed(self):
"""Test vacuum service to set the fan speed."""
group_vacuums = ','.join([ENTITY_VACUUM_BASIC,
ENTITY_VACUUM_COMPLETE])
old_state_basic = self.hass.states.get(ENTITY_VACUUM_BASIC)
old_state_complete = self.hass.states.get(ENTITY_VACUUM_COMPLETE)
vacuum.set_fan_speed(
self.hass, FAN_SPEEDS[0], entity_id=group_vacuums)
self.hass.block_till_done()
new_state_basic = self.hass.states.get(ENTITY_VACUUM_BASIC)
new_state_complete = self.hass.states.get(ENTITY_VACUUM_COMPLETE)
self.assertEqual(old_state_basic, new_state_basic)
self.assertNotIn(ATTR_FAN_SPEED, new_state_basic.attributes)
self.assertNotEqual(old_state_complete, new_state_complete)
self.assertEqual(FAN_SPEEDS[1],
old_state_complete.attributes[ATTR_FAN_SPEED])
self.assertEqual(FAN_SPEEDS[0],
new_state_complete.attributes[ATTR_FAN_SPEED])
def test_send_command(self):
"""Test vacuum service to send a command."""
group_vacuums = ','.join([ENTITY_VACUUM_BASIC,
ENTITY_VACUUM_COMPLETE])
old_state_basic = self.hass.states.get(ENTITY_VACUUM_BASIC)
old_state_complete = self.hass.states.get(ENTITY_VACUUM_COMPLETE)
vacuum.send_command(
self.hass, 'test_command', params={"p1": 3},
entity_id=group_vacuums)
self.hass.block_till_done()
new_state_basic = self.hass.states.get(ENTITY_VACUUM_BASIC)
new_state_complete = self.hass.states.get(ENTITY_VACUUM_COMPLETE)
self.assertEqual(old_state_basic, new_state_basic)
self.assertNotEqual(old_state_complete, new_state_complete)
self.assertEqual(STATE_ON, new_state_complete.state)
self.assertEqual("Executing test_command({'p1': 3})",
new_state_complete.attributes[ATTR_STATUS])

View file

@ -0,0 +1,154 @@
"""The tests for the Xiaomi vacuum platform."""
import asyncio
from datetime import timedelta
from unittest import mock
import pytest
from homeassistant.components.vacuum import (
ATTR_BATTERY_ICON,
ATTR_FAN_SPEED, ATTR_FAN_SPEED_LIST, DOMAIN,
SERVICE_LOCATE, SERVICE_RETURN_TO_BASE, SERVICE_SEND_COMMAND,
SERVICE_SET_FAN_SPEED, SERVICE_STOP,
SERVICE_TOGGLE, SERVICE_TURN_OFF, SERVICE_TURN_ON)
from homeassistant.components.vacuum.xiaomi import (
ATTR_CLEANED_AREA, ATTR_CLEANING_TIME, ATTR_DO_NOT_DISTURB, ATTR_ERROR,
CONF_HOST, CONF_NAME, CONF_TOKEN, PLATFORM,
SERVICE_MOVE_REMOTE_CONTROL, SERVICE_MOVE_REMOTE_CONTROL_STEP,
SERVICE_START_REMOTE_CONTROL, SERVICE_STOP_REMOTE_CONTROL)
from homeassistant.const import (
ATTR_SUPPORTED_FEATURES, CONF_PLATFORM, STATE_OFF, STATE_ON)
from homeassistant.setup import async_setup_component
@pytest.fixture
def mock_mirobo():
"""Mock mock_mirobo."""
mock_vacuum = mock.MagicMock()
mock_vacuum.Vacuum().status().data = {'test': 'raw'}
mock_vacuum.Vacuum().status().is_on = False
mock_vacuum.Vacuum().status().fanspeed = 38
mock_vacuum.Vacuum().status().got_error = False
mock_vacuum.Vacuum().status().dnd = True
mock_vacuum.Vacuum().status().battery = 82
mock_vacuum.Vacuum().status().clean_area = 123.43218
mock_vacuum.Vacuum().status().clean_time = timedelta(
hours=2, minutes=35, seconds=34)
mock_vacuum.Vacuum().status().state = 'Test Xiaomi Charging'
with mock.patch.dict('sys.modules', {
'mirobo': mock_vacuum,
}):
yield mock_vacuum
@asyncio.coroutine
def test_xiaomi_vacuum(hass, caplog, mock_mirobo):
"""Test vacuum supported features."""
entity_name = 'test_vacuum_cleaner'
entity_id = '{}.{}'.format(DOMAIN, entity_name)
yield from async_setup_component(
hass, DOMAIN,
{DOMAIN: {CONF_PLATFORM: PLATFORM,
CONF_HOST: '127.0.0.1',
CONF_NAME: entity_name,
CONF_TOKEN: '12345678901234567890123456789012'}})
assert 'Initializing with host 127.0.0.1 (token 12345...)' in caplog.text
# Check state attributes
state = hass.states.get(entity_id)
assert state.state == STATE_OFF
assert state.attributes.get(ATTR_SUPPORTED_FEATURES) == 1023
assert state.attributes.get(ATTR_DO_NOT_DISTURB) == STATE_ON
assert state.attributes.get(ATTR_ERROR) is None
assert (state.attributes.get(ATTR_BATTERY_ICON)
== 'mdi:battery-charging-80')
assert state.attributes.get(ATTR_CLEANING_TIME) == '2:35:34'
assert state.attributes.get(ATTR_CLEANED_AREA) == 123.43
assert state.attributes.get(ATTR_FAN_SPEED) == 'Quiet'
assert (state.attributes.get(ATTR_FAN_SPEED_LIST)
== ['Quiet', 'Balanced', 'Turbo', 'Max'])
# Call services
yield from hass.services.async_call(
DOMAIN, SERVICE_TURN_ON, blocking=True)
assert str(mock_mirobo.mock_calls[-2]) == 'call.Vacuum().start()'
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_TURN_OFF, blocking=True)
assert str(mock_mirobo.mock_calls[-2]) == 'call.Vacuum().home()'
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_TOGGLE, blocking=True)
assert str(mock_mirobo.mock_calls[-2]) == 'call.Vacuum().start()'
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_STOP, blocking=True)
assert str(mock_mirobo.mock_calls[-2]) == 'call.Vacuum().stop()'
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_RETURN_TO_BASE, blocking=True)
assert str(mock_mirobo.mock_calls[-2]) == 'call.Vacuum().home()'
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_LOCATE, blocking=True)
assert str(mock_mirobo.mock_calls[-2]) == 'call.Vacuum().find()'
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_SET_FAN_SPEED, {"fan_speed": 60}, blocking=True)
assert (str(mock_mirobo.mock_calls[-2])
== 'call.Vacuum().set_fan_speed(60)')
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_SEND_COMMAND,
{"command": "raw"}, blocking=True)
assert (str(mock_mirobo.mock_calls[-2])
== "call.Vacuum().raw_command('raw', None)")
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_SEND_COMMAND,
{"command": "raw", "params": {"k1": 2}}, blocking=True)
assert (str(mock_mirobo.mock_calls[-2])
== "call.Vacuum().raw_command('raw', {'k1': 2})")
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_START_REMOTE_CONTROL, {}, blocking=True)
assert (str(mock_mirobo.mock_calls[-2])
== "call.Vacuum().manual_start()")
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_MOVE_REMOTE_CONTROL,
{"duration": 1000, "rotation": -40, "velocity": -0.1}, blocking=True)
assert 'call.Vacuum().manual_control(' in str(mock_mirobo.mock_calls[-2])
assert 'duration=1000' in str(mock_mirobo.mock_calls[-2])
assert 'rotation=-40' in str(mock_mirobo.mock_calls[-2])
assert 'velocity=-0.1' in str(mock_mirobo.mock_calls[-2])
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_STOP_REMOTE_CONTROL, {}, blocking=True)
assert (str(mock_mirobo.mock_calls[-2])
== "call.Vacuum().manual_stop()")
assert str(mock_mirobo.mock_calls[-1]) == 'call.Vacuum().status()'
yield from hass.services.async_call(
DOMAIN, SERVICE_MOVE_REMOTE_CONTROL_STEP,
{"duration": 2000, "rotation": 120, "velocity": 0.1}, blocking=True)
assert ('call.Vacuum().manual_control_once('
in str(mock_mirobo.mock_calls[-2]))
assert 'duration=2000' in str(mock_mirobo.mock_calls[-2])
assert 'rotation=120' in str(mock_mirobo.mock_calls[-2])
assert 'velocity=0.1' in str(mock_mirobo.mock_calls[-2])

53
tests/util/test_icon.py Normal file
View file

@ -0,0 +1,53 @@
"""Test Home Assistant icon util methods."""
import unittest
class TestIconUtil(unittest.TestCase):
"""Test icon util methods."""
def test_battery_icon(self):
"""Test icon generator for battery sensor."""
from homeassistant.util.icon import icon_for_battery_level
self.assertEqual('mdi:battery-unknown',
icon_for_battery_level(None, True))
self.assertEqual('mdi:battery-unknown',
icon_for_battery_level(None, False))
self.assertEqual('mdi:battery-outline',
icon_for_battery_level(5, True))
self.assertEqual('mdi:battery-outline',
icon_for_battery_level(5, False))
self.assertEqual('mdi:battery-charging-100',
icon_for_battery_level(100, True))
self.assertEqual('mdi:battery',
icon_for_battery_level(100, False))
iconbase = 'mdi:battery'
for level in range(0, 100, 5):
print('Level: %d. icon: %s, charging: %s'
% (level, icon_for_battery_level(level, False),
icon_for_battery_level(level, True)))
if level <= 10:
postfix_charging = '-outline'
elif level <= 30:
postfix_charging = '-charging-20'
elif level <= 50:
postfix_charging = '-charging-40'
elif level <= 70:
postfix_charging = '-charging-60'
elif level <= 90:
postfix_charging = '-charging-80'
else:
postfix_charging = '-charging-100'
if 5 < level < 95:
postfix = '-{}'.format(int(round(level / 10 - .01)) * 10)
elif level <= 5:
postfix = '-outline'
else:
postfix = ''
self.assertEqual(iconbase + postfix,
icon_for_battery_level(level, False))
self.assertEqual(iconbase + postfix_charging,
icon_for_battery_level(level, True))