Merge branch 'dev' of https://github.com/balloob/home-assistant into scheduler

# By Paulus Schoutsen
# Via Paulus Schoutsen
* 'dev' of https://github.com/balloob/home-assistant: (51 commits)
  Light test tests light profile loading
  Loader test tests now custom component loading
  Default config dir is now working_dir/config
  Add sun component test for state change
  Tweak light test to create correct exception
  Better light.xy_color parsing
  Added light component test coverage
  Renamed mock_switch_platform to mock_toggledevice_platform
  Expanded switch test to push it to 100% coverage
  Fix to make tests work on Travis CI
  Added tests for switch component
  Clean up code sun component tests
  Added test coverage for sun component
  Minor fix for Chromecast component
  Cleaned up tests a bit
  Added initial Chromecast test coverage
  Final test added to get to 100% coverage for groups
  Extended group tests
  Added group component tests
  Reorganized testing
  ...
This commit is contained in:
Gustav Ahlberg 2014-11-26 20:44:37 +01:00
commit 09908f5780
52 changed files with 2427 additions and 16575 deletions

3
.gitmodules vendored
View file

@ -4,6 +4,3 @@
[submodule "homeassistant/external/pywemo"]
path = homeassistant/external/pywemo
url = https://github.com/balloob/pywemo.git
[submodule "homeassistant/external/phue"]
path = homeassistant/external/phue
url = https://github.com/studioimaginaire/phue.git

View file

@ -3,8 +3,10 @@ python:
- "3.4"
install:
- pip install -r requirements.txt
- pip install pep8 pylint
- pip install flake8 pylint coveralls
script:
- pep8 homeassistant --exclude bower_components,external
- flake8 homeassistant --exclude bower_components,external
- pylint homeassistant
- python -m homeassistant -t test
- coverage run --source=homeassistant -m unittest discover test
after_success:
- coveralls

View file

@ -13,15 +13,17 @@ Contains the code to interact with WeMo switches. Called if type=wemo in switch
**homeassistant/components/switch/tellstick.py**
Contains the code to interact with Tellstick switches. Called if type=tellstick in switch config.
If a component exists, your job is easy. Have a look at how the component works with other platforms and create a similar file for the platform that you would like to add.If you cannot find a suitable component, you'll have to add it yourself. When writing a component try to structure it after the Switch component to maximize reusability.
If a component exists, your job is easy. Have a look at how the component works with other platforms and create a similar file for the platform that you would like to add. If you cannot find a suitable component, you'll have to add it yourself. When writing a component try to structure it after the Switch component to maximize reusability.
Communication between Home Assistant and devices should happen via third-party libraries that implement the device API. This will make sure the platform support code stays as small as possible.
For help on building your component, please see the See the documentation on [further customizing Home Assistant](https://github.com/balloob/home-assistant#further-customizing-home-assistant).
After you finish adding support for your device:
- update the supported devices in README.md.
- add any new dependencies to requirements.txt.
- Make sure all your code passes Pylint and PEP8 validation. To generate reports, run `pylint homeassistant > pylint.txt` and `pep8 homeassistant --exclude bower_components,external > pep8.txt`.
- Make sure all your code passes Pylint, flake8 (PEP8 and some more) validation. To generate reports, run `pylint homeassistant > pylint.txt` and `flake8 homeassistant --exclude bower_components,external > flake8.txt`.
If you've added a component:

View file

@ -1,4 +1,4 @@
# Home Assistant [![Build Status](https://travis-ci.org/balloob/home-assistant.svg?branch=master)](https://travis-ci.org/balloob/home-assistant)
# Home Assistant [![Build Status](https://travis-ci.org/balloob/home-assistant.svg?branch=master)](https://travis-ci.org/balloob/home-assistant) [![Coverage Status](https://img.shields.io/coveralls/balloob/home-assistant.svg)](https://coveralls.io/r/balloob/home-assistant?branch=master)
Home Assistant is a home automation platform running on Python 3. The goal of Home Assistant is to be able to track and control all devices at home and offer a platform for automating control.
@ -27,9 +27,11 @@ Home Assistant also includes functionality for controlling HTPCs:
The system is built modular so support for other devices or actions can be implemented easily. See also the [section on architecture](#architecture) and the [section on customizing](#customizing).
If you run into issues while using Home Assistant or during development of a component, reach out to the [Home Assistant developer community](https://groups.google.com/forum/#!forum/home-assistant-dev).
## Installation instructions / Quick-start guide
Running Home Assistant requires that python3 and the packages pyephem and requests are installed.
Running Home Assistant requires that python3 and the package requests are installed.
Run the following code to get up and running with the minimum setup:
@ -51,13 +53,15 @@ docker run -d --name="home-assistant" -v /path/to/homeassistant/config:/config -
After you got the demo mode running it is time to enable some real components and get started. An example configuration file has been provided in [/config/home-assistant.conf.example](https://github.com/balloob/home-assistant/blob/master/config/home-assistant.conf.example).
*Note:* you can append `?api_password=YOUR_PASSWORD` to the url of the web interface to log in automatically.
### Philips Hue
To get Philips Hue working you will have to connect Home Assistant to the Hue bridge.
Run the following command from your config dir and follow the instructions:
```bash
python -m phue --host HUE_BRIDGE_IP_ADDRESS --config-file-path phue.conf
python3 -m phue --host HUE_BRIDGE_IP_ADDRESS --config-file-path phue.conf
```
After that add the following lines to your `home-assistant.conf`:
@ -88,21 +92,17 @@ Once tracking the `device_tracker` component will maintain a file in your config
<a name='customizing'></a>
## Further customizing Home Assistant
If you run into issues while developing your component, reach out to the [Home Assistant developer community](https://groups.google.com/forum/#!forum/home-assistant-dev).
Home Assistant can be extended by components. Components can listen for- or trigger events and offer services. Components are written in Python and can do all the goodness that Python has to offer.
Home Assistant offers [built-in components](#components) but it is easy to built your own. An example component can be found in [`/config/custom_components/example.py`](https://github.com/balloob/home-assistant/blob/master/config/custom_components/example.py).
*Note:* Home Assistant will use the directory that contains your config file as the directory that holds your customizations. By default this is the `./config` folder but this can be placed anywhere on the filesystem.
*Note:* Home Assistant will use the directory that contains your config file as the directory that holds your customizations. By default this is the `./config` folder but this can be pointed anywhere on the filesystem by using the `--config /YOUR/CONFIG/PATH/` argument.
A component will be loaded on start if a section (ie. `[light]`) for it exists in the config file or a module that depends on the component is loaded. When loading a component Home Assistant will check the following paths:
* &lt;config file directory>/custom_components/&lt;component name>.py
* homeassistant/components/&lt;component name>.py (built-in components)
Upon loading of a component it will be validated to see if the required fields (`DOMAIN`, `DEPENDENCIES`) and required method ( `setup(hass, config)` ) are available.
Once loaded, a component will only be setup if all dependencies can be loaded and are able to setup. Keep an eye on the logs to see if loading and setup of your component went well.
*Warning:* You can override a built-in component by offering a component with the same name in your custom_components folder. This is not recommended and may lead to unexpected behavior!
@ -133,6 +133,8 @@ host=paulusschoutsen.nl
Then in the setup-method of your component you will be able to refer to `config[example][host]` to get the value `paulusschoutsen.nl`.
If you want to get your component included with the Home Assistant distribution, please take a look at the [contributing page](https://github.com/balloob/home-assistant/blob/master/CONTRIBUTING.md).
<a name="architecture"></a>
## Architecture

View file

@ -9,7 +9,8 @@ cp polymer/bower_components/webcomponentsjs/webcomponents.min.js .
# Let Polymer refer to the minified JS version before we compile
sed -i.bak 's/polymer\.js/polymer\.min\.js/' polymer/bower_components/polymer/polymer.html
vulcanize -o frontend.html --inline polymer/splash-login.html
vulcanize -o frontend.html --inline --strip polymer/splash-login.html
# Revert back the change to the Polymer component
rm polymer/bower_components/polymer/polymer.html

Binary file not shown.

Before

Width:  |  Height:  |  Size: 160 KiB

After

Width:  |  Height:  |  Size: 136 KiB

Before After
Before After

View file

@ -6,7 +6,6 @@ Home Assistant is a Home Automation framework for observing the state
of entities and react to changes.
"""
import sys
import os
import time
import logging
@ -25,6 +24,7 @@ DOMAIN = "homeassistant"
SERVICE_HOMEASSISTANT_STOP = "stop"
EVENT_HOMEASSISTANT_START = "homeassistant_start"
EVENT_HOMEASSISTANT_STOP = "homeassistant_stop"
EVENT_STATE_CHANGED = "state_changed"
EVENT_TIME_CHANGED = "time_changed"
EVENT_CALL_SERVICE = "call_service"
@ -63,7 +63,7 @@ class HomeAssistant(object):
self.services = ServiceRegistry(self.bus, pool)
self.states = StateMachine(self.bus)
self.config_dir = os.getcwd()
self.config_dir = os.path.join(os.getcwd(), 'config')
def get_config_path(self, path):
""" Returns path to the file within the config dir. """
@ -90,6 +90,8 @@ class HomeAssistant(object):
except KeyboardInterrupt:
break
self.stop()
def call_service(self, domain, service, service_data=None):
""" Fires event to call specified service. """
event_data = service_data or {}
@ -108,7 +110,11 @@ class HomeAssistant(object):
def track_state_change(self, entity_ids, action,
from_state=None, to_state=None):
""" Track specific state changes. """
"""
Track specific state changes.
entity_ids, from_state and to_state can be string or list.
Use list to match multiple.
"""
from_state = _process_match_param(from_state)
to_state = _process_match_param(to_state)
@ -131,14 +137,16 @@ class HomeAssistant(object):
self.bus.listen(EVENT_STATE_CHANGED, state_listener)
def track_point_in_time(self, action, point_in_time):
""" Adds a listener that fires once after a spefic point in time. """
"""
Adds a listener that fires once at or after a spefic point in time.
"""
@ft.wraps(action)
def point_in_time_listener(event):
""" Listens for matching time_changed events. """
now = event.data[ATTR_NOW]
if now > point_in_time and \
if now >= point_in_time and \
not hasattr(point_in_time_listener, 'run'):
# Set variable so that we will never run twice.
@ -219,10 +227,21 @@ class HomeAssistant(object):
self.bus.listen(event_type, onetime_listener)
def stop(self):
""" Stops Home Assistant and shuts down all threads. """
_LOGGER.info("Stopping")
self.bus.fire(EVENT_HOMEASSISTANT_STOP)
# Wait till all responses to homeassistant_stop are done
self._pool.block_till_done()
self._pool.stop()
def _process_match_param(parameter):
""" Wraps parameter in a list if it is not one and returns it. """
if not parameter:
if not parameter or parameter == MATCH_ALL:
return MATCH_ALL
elif isinstance(parameter, list):
return parameter
@ -240,7 +259,7 @@ def _matcher(subject, pattern):
class JobPriority(util.OrderedEnum):
""" Provides priorities for bus events. """
# pylint: disable=no-init
# pylint: disable=no-init,too-few-public-methods
EVENT_SERVICE = 1
EVENT_STATE = 2
@ -289,7 +308,7 @@ def create_worker_pool(thread_count=POOL_NUM_THREAD):
class EventOrigin(enum.Enum):
""" Distinguish between origin of event. """
# pylint: disable=no-init
# pylint: disable=no-init,too-few-public-methods
local = "LOCAL"
remote = "REMOTE"
@ -313,11 +332,11 @@ class Event(object):
# pylint: disable=maybe-no-member
if self.data:
return "<Event {}[{}]: {}>".format(
self.event_type, self.origin.value[0],
self.event_type, str(self.origin)[0],
util.repr_helper(self.data))
else:
return "<Event {}[{}]>".format(self.event_type,
self.origin.value[0])
str(self.origin)[0])
class EventBus(object):
@ -381,9 +400,9 @@ class EventBus(object):
if not self._listeners[event_type]:
self._listeners.pop(event_type)
except (KeyError, AttributeError):
except (KeyError, ValueError):
# KeyError is key event_type listener did not exist
# AttributeError if listener did not exist within event_type
# ValueError if listener did not exist within event_type
pass
@ -593,6 +612,7 @@ class Timer(threading.Thread):
self.daemon = True
self._bus = hass.bus
self.interval = interval or TIMER_INTERVAL
self._stop = threading.Event()
# We want to be able to fire every time a minute starts (seconds=0).
# We want this so other modules can use that to make sure they fire
@ -602,6 +622,9 @@ class Timer(threading.Thread):
hass.listen_once_event(EVENT_HOMEASSISTANT_START,
lambda event: self.start())
hass.listen_once_event(EVENT_HOMEASSISTANT_STOP,
lambda event: self._stop.set())
def run(self):
""" Start the timer. """
@ -612,7 +635,7 @@ class Timer(threading.Thread):
calc_now = dt.datetime.now
interval = self.interval
while True:
while not self._stop.isSet():
now = calc_now()
# First check checks if we are not on a second matching the

View file

@ -20,7 +20,6 @@ except ImportError:
def main():
""" Starts Home Assistant. Will create demo config if no config found. """
tasks = ['serve', 'test']
parser = argparse.ArgumentParser()
parser.add_argument(
@ -29,64 +28,49 @@ def main():
default="config",
help="Directory that contains the Home Assistant configuration")
parser.add_argument(
'-t', '--task',
default=tasks[0],
choices=tasks,
help="Task to execute. Defaults to serve.")
args = parser.parse_args()
if args.task == tasks[1]:
# unittest does not like our command line arguments, remove them
sys.argv[1:] = []
# Validate that all core dependencies are installed
import_fail = False
import unittest
for module in ['requests']:
try:
importlib.import_module(module)
except ImportError:
import_fail = True
print(
'Fatal Error: Unable to find dependency {}'.format(module))
unittest.main(module='homeassistant.test')
if import_fail:
print(("Install dependencies by running: "
"pip3 install -r requirements.txt"))
exit()
else:
# Validate that all core dependencies are installed
import_fail = False
# Test if configuration directory exists
config_dir = os.path.join(os.getcwd(), args.config)
for module in ['requests']:
try:
importlib.import_module(module)
except ImportError:
import_fail = True
print(
'Fatal Error: Unable to find dependency {}'.format(module))
if not os.path.isdir(config_dir):
print(('Fatal Error: Unable to find specified configuration '
'directory {} ').format(config_dir))
sys.exit()
if import_fail:
print(("Install dependencies by running: "
"pip3 install -r requirements.txt"))
exit()
config_path = os.path.join(config_dir, 'home-assistant.conf')
# Test if configuration directory exists
config_dir = os.path.join(os.getcwd(), args.config)
if not os.path.isdir(config_dir):
print(('Fatal Error: Unable to find specified configuration '
'directory {} ').format(config_dir))
# Ensure a config file exists to make first time usage easier
if not os.path.isfile(config_path):
try:
with open(config_path, 'w') as conf:
conf.write("[http]\n")
conf.write("api_password=password\n\n")
conf.write("[demo]\n")
except IOError:
print(('Fatal Error: No configuration file found and unable '
'to write a default one to {}').format(config_path))
sys.exit()
config_path = os.path.join(config_dir, 'home-assistant.conf')
# Ensure a config file exists to make first time usage easier
if not os.path.isfile(config_path):
try:
with open(config_path, 'w') as conf:
conf.write("[http]\n")
conf.write("api_password=password\n\n")
conf.write("[demo]\n")
except IOError:
print(('Fatal Error: No configuration file found and unable '
'to write a default one to {}').format(config_path))
sys.exit()
hass = bootstrap.from_config_file(config_path)
hass.start()
hass.block_till_stopped()
hass = bootstrap.from_config_file(config_path)
hass.start()
hass.block_till_stopped()
if __name__ == "__main__":
main()

View file

@ -16,7 +16,6 @@ Each component should publish services only under its own domain.
"""
import itertools as it
import logging
import importlib
import homeassistant as ha
import homeassistant.util as util
@ -60,7 +59,7 @@ def is_on(hass, entity_id=None):
if entity_id:
group = get_component('group')
entity_ids = group.expand_entity_ids([entity_id])
entity_ids = group.expand_entity_ids(hass, [entity_id])
else:
entity_ids = hass.states.entity_ids
@ -81,13 +80,19 @@ def is_on(hass, entity_id=None):
return False
def turn_on(hass, **service_data):
def turn_on(hass, entity_id=None, **service_data):
""" Turns specified entity on if possible. """
if entity_id is not None:
service_data[ATTR_ENTITY_ID] = entity_id
hass.call_service(ha.DOMAIN, SERVICE_TURN_ON, service_data)
def turn_off(hass, **service_data):
def turn_off(hass, entity_id=None, **service_data):
""" Turns specified entity off. """
if entity_id is not None:
service_data[ATTR_ENTITY_ID] = entity_id
hass.call_service(ha.DOMAIN, SERVICE_TURN_OFF, service_data)
@ -140,7 +145,7 @@ class ToggleDevice(object):
def get_state_attributes(self):
""" Returns optional state attributes. """
return None
return {}
def update(self):
""" Retrieve latest state from the real device. """
@ -170,7 +175,6 @@ def setup(hass, config):
def handle_turn_service(service):
""" Method to handle calls to homeassistant.turn_on/off. """
entity_ids = extract_entity_ids(hass, service)
# Generic turn on/off method requires entity id

View file

@ -6,6 +6,7 @@ Provides functionality to interact with Chromecasts.
"""
import logging
import homeassistant as ha
import homeassistant.util as util
import homeassistant.components as components
@ -113,8 +114,8 @@ def setup(hass, config):
return False
if 'hosts' in config[DOMAIN]:
hosts = config[DOMAIN]['hosts'].split(",")
if ha.CONF_HOSTS in config[DOMAIN]:
hosts = config[DOMAIN][ha.CONF_HOSTS].split(",")
# If no hosts given, scan for chromecasts
else:

View file

@ -111,7 +111,9 @@ def setup(hass, config):
# Setup chromecast
hass.states.set("chromecast.Living_Rm", "Netflix",
{'friendly_name': 'Living Room'})
{'friendly_name': 'Living Room',
ATTR_ENTITY_PICTURE:
'http://graph.facebook.com/KillBillMovie/picture'})
# Setup tellstick sensors
hass.states.set("tellstick_sensor.Outside_temperature", "15.6",

View file

@ -10,8 +10,6 @@ import os
import csv
from datetime import datetime, timedelta
import requests
import homeassistant as ha
from homeassistant.loader import get_component
import homeassistant.util as util
@ -176,10 +174,9 @@ class DeviceTracker(object):
is_new_file = not os.path.isfile(known_dev_path)
with open(known_dev_path, 'a') as outp:
_LOGGER.info((
"Found {} new devices,"
" updating {}").format(len(unknown_devices),
known_dev_path))
_LOGGER.info(
"Found %d new devices, updating %s",
len(unknown_devices), known_dev_path)
writer = csv.writer(outp)
@ -199,10 +196,9 @@ class DeviceTracker(object):
'picture': ""}
except IOError:
_LOGGER.exception((
"Error updating {}"
"with {} new devices").format(known_dev_path,
len(unknown_devices)))
_LOGGER.exception(
"Error updating %s with %d new devices",
known_dev_path, len(unknown_devices))
self.lock.release()
@ -268,9 +264,9 @@ class DeviceTracker(object):
self.path_known_devices_file)
# Remove entities that are no longer maintained
new_entity_ids = set([known_devices[device]['entity_id']
for device in known_devices
if known_devices[device]['track']])
new_entity_ids = set([known_devices[dev]['entity_id']
for dev in known_devices
if known_devices[dev]['track']])
for entity_id in \
self.device_entity_ids - new_entity_ids:

View file

@ -80,8 +80,8 @@ def get_entity_ids(hass, entity_id, domain_filter=None):
entity_ids = hass.states.get(entity_id).attributes[ATTR_ENTITY_ID]
if domain_filter:
return [entity_id for entity_id in entity_ids
if entity_id.startswith(domain_filter)]
return [ent_id for ent_id in entity_ids
if ent_id.startswith(domain_filter)]
else:
return entity_ids

View file

@ -85,8 +85,6 @@ from urllib.parse import urlparse, parse_qs
import homeassistant as ha
import homeassistant.remote as rem
import homeassistant.util as util
from homeassistant.components import (STATE_ON, STATE_OFF,
SERVICE_TURN_ON, SERVICE_TURN_OFF)
from . import frontend
DOMAIN = "http"
@ -138,6 +136,10 @@ def setup(hass, config):
lambda event:
threading.Thread(target=server.start, daemon=True).start())
hass.listen_once_event(
ha.EVENT_HOMEASSISTANT_STOP,
lambda event: server.shutdown())
# If no local api set, set one with known information
if isinstance(hass, rem.HomeAssistant) and hass.local_api is None:
hass.local_api = \
@ -148,6 +150,10 @@ def setup(hass, config):
class HomeAssistantHTTPServer(ThreadingMixIn, HTTPServer):
""" Handle HTTP requests in a threaded fashion. """
# pylint: disable=too-few-public-methods
allow_reuse_address = True
daemon_threads = True
# pylint: disable=too-many-arguments
def __init__(self, server_address, RequestHandlerClass,
@ -348,7 +354,8 @@ class RequestHandler(SimpleHTTPRequestHandler):
else:
app_url = "frontend-{}.html".format(frontend.VERSION)
write(("<html>"
write(("<!doctype html>"
"<html>"
"<head><title>Home Assistant</title>"
"<meta name='mobile-web-app-capable' content='yes'>"
"<link rel='shortcut icon' href='/static/favicon.ico' />"

View file

@ -1,2 +1,2 @@
""" DO NOT MODIFY. Auto-generated by build_frontend script """
VERSION = "d23de9af256f9c1ab74fc3969fa410d3"
VERSION = "12ba7bca8ad0c196cb04ada4fe85a76b"

File diff suppressed because one or more lines are too long

View file

@ -4,15 +4,11 @@
"authors": [
"Paulus Schoutsen <Paulus@PaulusSchoutsen.nl>"
],
"main": "index.htm",
"main": "splash-login.html",
"license": "MIT",
"private": true,
"ignore": [
"**/.*",
"node_modules",
"bower_components",
"test",
"tests"
"bower_components"
],
"dependencies": {
"webcomponentsjs": "Polymer/webcomponentsjs#~0.5.1",
@ -24,6 +20,7 @@
"core-item": "Polymer/core-item#~0.5.1",
"core-input": "Polymer/core-input#~0.5.1",
"core-icons": "polymer/core-icons#~0.5.1",
"core-image": "polymer/core-image#~0.5.1",
"paper-toast": "Polymer/paper-toast#~0.5.1",
"paper-dialog": "Polymer/paper-dialog#~0.5.1",
"paper-spinner": "Polymer/paper-spinner#~0.5.1",

View file

@ -29,6 +29,13 @@
}
@media all and (max-width: 620px) {
paper-action-dialog {
margin: 0;
width: 100%;
height: calc(100% - 64px);
top: 64px;
}
.eventContainer {
display: none;
}

View file

@ -12,59 +12,83 @@
<polymer-element name="home-assistant-main" attributes="api">
<template>
<style type="text/css">
<style>
core-header-panel {
height: 100%;
overflow: auto;
-webkit-overflow-scrolling: touch;
background-color: #E5E5E5;
}
core-toolbar {
background: #03a9f4;
font-size: 1.3rem;
color: white;
}
core-toolbar.tall {
/* 2x normal height */
height: 128px;
}
core-toolbar .bottom {
opacity: 0;
transition: opacity 0.30s ease-out;
}
core-toolbar.tall .bottom {
opacity: 1;
}
paper-tab {
text-transform: uppercase;
}
.dropdown {
paper-menu-button {
margin-top: 5px !important;
}
paper-dropdown {
border-radius: 3px;
}
.menu {
paper-dropdown .menu {
margin: 0;
padding: 8px 0;
color: black;
}
paper-item {
overflow: hidden;
white-space: nowrap;
text-overflow: ellipsis;
}
paper-item a {
text-decoration: none;
}
</style>
<core-header-panel fullbleed>
<core-header-panel fit mode="{{hasCustomGroups && 'waterfall-tall'}}">
<core-toolbar class='medium-tall'>
<div flex>
Home Assistant
</div>
<paper-icon-button icon="refresh" on-click="{{handleRefreshClick}}"></paper-icon-button>
<core-toolbar>
<div flex>Home Assistant</div>
<paper-icon-button icon="refresh"
on-click="{{handleRefreshClick}}"></paper-icon-button>
<paper-icon-button icon="settings-remote"
on-click="{{handleServiceClick}}"></paper-icon-button>
<paper-menu-button>
<paper-icon-button icon="more-vert" noink></paper-icon-button>
<paper-dropdown class="dropdown" halign="right" duration="200">
<paper-dropdown halign="right" duration="200" class="dropdown">
<core-menu class="menu">
<paper-item label="Set State">
<a on-click={{handleAddStateClick}}></a>
<paper-item>
<a on-click={{handleAddStateClick}}>Set State</a>
</paper-item>
<paper-item label="Trigger Event">
<a on-click={{handleEventClick}}></a>
<paper-item>
<a on-click={{handleEventClick}}>Trigger Event</a>
</paper-item>
<paper-item label="Log Out">
<a on-click={{handleLogOutClick}}></a>
<paper-item>
<a on-click={{handleLogOutClick}}>Log Out</a>
</paper-item>
</core-menu>
</paper-dropdown>
@ -76,21 +100,20 @@
<paper-tab>ALL</paper-tab>
<template repeat="{{state in api.states}}">
<template if="{{state.isCustomGroup}}">
<paper-tab data-entity="{{state.entity_id}}">
{{state.entityDisplay}}
</paper-tab>
</template>
<template repeat="{{group in customGroups}}">
<paper-tab data-entity="{{group.entity_id}}">
{{group.entityDisplay}}
</paper-tab>
</template>
</paper-tabs>
</div>
</core-toolbar>
<div class="content" flex>
<states-cards api="{{api}}" filter="{{selectedTab}}"></states-cards>
</div>
<states-cards
api="{{api}}"
filter="{{selectedTab}}"
class="content"></states-cards>
</core-header-panel>
@ -99,6 +122,11 @@
Polymer({
selectedTab: null,
computed: {
customGroups: "getCustomGroups(api.states)",
hasCustomGroups: "customGroups.length > 0"
},
tabClicked: function(ev) {
if(ev.detail.isSelected) {
// will be null for ALL tab
@ -124,6 +152,11 @@
handleLogOutClick: function() {
this.api.logOut();
},
getCustomGroups: function(states) {
return states ?
states.filter(function(state) { return state.isCustomGroup;}) : [];
}
});

View file

@ -30,6 +30,13 @@
}
@media all and (max-width: 620px) {
paper-action-dialog {
margin: 0;
width: 100%;
height: calc(100% - 64px);
top: 64px;
}
.serviceContainer {
display: none;
}

View file

@ -9,12 +9,10 @@
<polymer-element name="splash-login" attributes="auth">
<template>
<style type="text/css">
<style>
:host {
font-family: RobotoDraft, 'Helvetica Neue', Helvetica, Arial;
height: 100%;
overflow: auto;
}
paper-input {
@ -29,11 +27,11 @@
height: 125px;
}
#validateBox {
#validatebox {
text-align: center;
}
#validateMessage {
#validatemessage {
margin-top: 10px;
}
@ -41,32 +39,29 @@
<home-assistant-api auth="{{auth}}" id="api"></home-assistant-api>
<template if="{{state == 'no_auth'}}">
<div layout horizontal center fit class='login'>
<div layout vertical center flex>
<img src="/static/favicon-192x192.png" />
<h1>Home Assistant</h1>
<div layout horizontal center fit class='login' id="splash">
<div layout vertical center flex>
<img src="/static/favicon-192x192.png" />
<h1>Home Assistant</h1>
<a href="#" id="hideKeyboardOnFocus"></a>
<div class='interact' layout vertical>
<div id='loginform'>
<paper-input-decorator label="Password" id="passwordDecorator">
<input is="core-input" type="password" id="passwordInput"
value="{{auth}}" on-keyup="{{passwordKeyup}}" autofocus>
</paper-input-decorator>
<paper-button on-click={{validatePassword}}>Log In</paper-button>
</div>
<div class='interact' layout vertical>
<div id='loginform'>
<paper-input-decorator label="Password" id="passwordDecorator">
<input is="core-input" type="password" id="passwordInput"
value="{{auth}}" on-keyup="{{passwordKeyup}}" autofocus>
</paper-input-decorator>
<paper-button on-click={{validatePassword}}>Log In</paper-button>
</div>
<div id="validateBox" hidden>
<paper-spinner active="true"></paper-spinner><br />
<div id="validateMessage">Validating password...</div>
</div>
<div id="validatebox" hidden>
<paper-spinner active="true"></paper-spinner><br />
<div id="validatemessage">Validating password...</div>
</div>
</div>
</div>
</template>
<template if="{{state == 'valid_auth'}}">
<home-assistant-main api="{{api}}"></home-assistant-main>
</template>
</div>
<home-assistant-main api="{{api}}" hidden id="main"></home-assistant-main>
</template>
<script>
@ -90,32 +85,53 @@
authChanged: function(oldVal, newVal) {
// log out functionality
if(newVal == "" && this.state == "valid_auth") {
if(newVal === "" && this.state === "valid_auth") {
this.state = "no_auth";
this.$.validateMessage.innerHTML = "Validating password...";
}
},
stateChanged: function(oldVal, newVal) {
if(newVal === "no_auth") {
// set login box showing
this.$.loginform.removeAttribute('hidden');
this.$.validatebox.setAttribute('hidden', null);
// reset to initial message
this.$.validatemessage.innerHTML = "Validating password...";
// show splash
this.$.splash.removeAttribute('hidden');
this.$.main.setAttribute('hidden', null);
} else { // valid_auth
this.$.splash.setAttribute('hidden', null);
this.$.main.removeAttribute('hidden');
}
},
passwordKeyup: function(ev) {
if(ev.keyCode == 13) {
// validate on enter
if(ev.keyCode === 13) {
this.validatePassword();
} else {
// clear error after we start typing again
} else if(this.$.passwordDecorator.isInvalid) {
this.$.passwordDecorator.isInvalid = false;
}
},
validatePassword: function() {
this.$.loginform.setAttribute('hidden', null);
this.$.validateBox.removeAttribute('hidden');
this.$.validatebox.removeAttribute('hidden');
this.$.hideKeyboardOnFocus.focus();
var passwordValid = function(result) {
this.$.validateMessage.innerHTML = "Loading data...";
this.$.validatemessage.innerHTML = "Loading data...";
this.api.fetchEvents();
this.api.fetchStates(function() {
this.state = "valid_auth";
}.bind(this));
}
};
var passwordInvalid = function(result) {
if(result && result.message) {
@ -126,9 +142,9 @@
this.auth = null;
this.$.passwordDecorator.isInvalid = true;
this.$.loginform.removeAttribute('hidden');
this.$.validateBox.setAttribute('hidden', null);
this.$.validatebox.setAttribute('hidden', null);
this.$.passwordInput.focus();
}
};
this.api.fetchServices(passwordValid.bind(this), passwordInvalid.bind(this));
}

View file

@ -1,4 +1,5 @@
<link rel="import" href="bower_components/polymer/polymer.html">
<link rel="import" href="bower_components/core-image/core-image.html">
<link rel="import" href="domain-icon.html">
@ -20,7 +21,7 @@
text-align: center;
}
#picture {
core-image {
border-radius: 50%;
}
@ -40,7 +41,11 @@
domain="{{stateObj.domain}}" data-domain="{{stateObj.domain}}"
state="{{stateObj.state}}" data-state="{{stateObj.state}}">
</domain-icon>
<div fit id="picture"></div>
<template if="{{stateObj.attributes.entity_picture}}">
<core-image
sizing="cover" fit
src="{{stateObj.attributes.entity_picture}}"></core-image>
</template>
</div>
</template>
@ -50,8 +55,7 @@
'stateObj.state': 'updateIconColor',
'stateObj.attributes.brightness': 'updateIconColor',
'stateObj.attributes.xy_color[0]': 'updateIconColor',
'stateObj.attributes.xy_color[1]': 'updateIconColor',
'stateObj.attributes.entity_picture': 'entityPictureChanged'
'stateObj.attributes.xy_color[1]': 'updateIconColor'
},
/**
@ -73,17 +77,6 @@
}
},
/**
* Called when the attribute for entity_picture has changed.
*/
entityPictureChanged: function(oldVal, newVal) {
if(newVal) {
this.$.picture.style.backgroundImage = 'url(' + newVal + ')';
} else {
this.$.picture.style.backgroundImage = null;
}
},
// from http://stackoverflow.com/questions/22894498/philips-hue-convert-xy-from-api-to-hex-or-rgb
xyBriToRgb: function (x, y, bri) {
z = 1.0 - x - y;

View file

@ -81,7 +81,7 @@
<div class="time-ago">
<core-tooltip label="{{stateObj.last_changed}}" position="bottom">
{{lastChangedFromNow}}
{{lastChangedFromNow(stateObj.last_changed)}}
</core-tooltip>
</div>
@ -109,7 +109,6 @@
</template>
<script>
Polymer({
// attributes
stateObj: {},
cb_turn_on: null,
@ -118,16 +117,12 @@
stateUnknown: false,
toggleChecked: -1,
computed: {
lastChangedFromNow: "stateObj.last_changed | parseLastChangedFromNow",
},
observe: {
'stateObj.state': 'stateChanged'
},
parseLastChangedFromNow: function(lastChanged) {
return moment(lastChanged, "HH:mm:ss DD-MM-YYYY").fromNow()
lastChangedFromNow: function(lastChanged) {
return moment(lastChanged, "HH:mm:ss DD-MM-YYYY").fromNow();
},
toggleCheckedChanged: function(oldVal, newVal) {
@ -144,8 +139,8 @@
},
stateChanged: function(oldVal, newVal) {
this.stateUnknown = newVal == null;
this.toggleChecked = newVal == "on"
this.stateUnknown = newVal === null;
this.toggleChecked = newVal === "on";
},
turn_on: function() {
@ -155,7 +150,7 @@
// unset state while we wait for an update
var delayUnsetSate = function() {
this.stateObj.state = null;
}
};
setTimeout(delayUnsetSate.bind(this), 500);
}
},
@ -167,7 +162,7 @@
// unset state while we wait for an update
var delayUnsetSate = function() {
this.stateObj.state = null;
}
};
setTimeout(delayUnsetSate.bind(this), 500);
}
},

View file

@ -9,7 +9,7 @@
<polymer-element name="state-set-dialog" attributes="api">
<template>
<paper-action-dialog id="dialog" heading="Set State" transition="core-transition-center" backdrop="true">
<paper-action-dialog id="dialog" heading="Set State" transition="core-transition-bottom" backdrop="true">
<style>
:host {
font-family: RobotoDraft, 'Helvetica Neue', Helvetica, Arial;
@ -28,6 +28,13 @@
}
@media all and (max-width: 620px) {
paper-action-dialog {
margin: 0;
width: 100%;
height: calc(100% - 64px);
top: 64px;
}
.stateContainer {
display: none;
}

View file

@ -41,7 +41,7 @@
</style>
<div horizontal layout wrap>
<template repeat="{{state in states}}">
<template repeat="{{state in getStates(api.states, filter)}}">
<state-card
stateObj="{{state}}"
cb_turn_on="{{api.turn_on}}"
@ -54,53 +54,35 @@
</template>
<script>
Polymer({
raw_states: [],
states: [],
filter: null,
filter_substates: null,
filterChanged: function(oldVal, newVal) {
this.refilterStates();
},
getStates: function(states, filter) {
if(!states) {
return [];
}
ready: function() {
this.editCallback = this.editCallback.bind(this);
},
domReady: function() {
this.raw_states = this.api.states
this.api.addEventListener('states-updated', this.statesUpdated.bind(this))
this.refilterStates();
},
statesUpdated: function() {
this.raw_states = this.api.states;
this.refilterStates();
},
refilterStates: function() {
if(this.filter == null) {
// all states except groups
this.states = this.raw_states.filter(function(state) {
return state.domain != 'group'
if(!filter) {
// if no filter, return all non-group states
return states.filter(function(state) {
return state.domain != 'group';
});
} else {
// we have a filter, return the parent filter and its children
var filter_state = this.api.getState(this.filter);
var map_states = function(entity_id) {
return this.api.getState(entity_id);
}.bind(this)
}.bind(this);
// take the parent state and append it's children
this.states = [filter_state].concat(
filter_state.attributes.entity_id.map(map_states))
return [filter_state].concat(
filter_state.attributes.entity_id.map(map_states));
}
},
ready: function() {
this.editCallback = this.editCallback.bind(this);
},
editCallback: function(entityId) {
this.api.showEditStateDialog(entityId);
},

View file

@ -141,6 +141,35 @@ def setup(hass, config):
if not util.validate_config(config, {DOMAIN: [ha.CONF_TYPE]}, _LOGGER):
return False
# Load built-in profiles and custom profiles
profile_paths = [os.path.join(os.path.dirname(__file__),
LIGHT_PROFILES_FILE),
hass.get_config_path(LIGHT_PROFILES_FILE)]
profiles = {}
for profile_path in profile_paths:
if os.path.isfile(profile_path):
with open(profile_path) as inp:
reader = csv.reader(inp)
# Skip the header
next(reader, None)
try:
for profile_id, color_x, color_y, brightness in reader:
profiles[profile_id] = (float(color_x), float(color_y),
int(brightness))
except ValueError:
# ValueError if not 4 values per row
# ValueError if convert to float/int failed
_LOGGER.error(
"Error parsing light profiles from %s", profile_path)
return False
# Load platform
light_type = config[DOMAIN][ha.CONF_TYPE]
light_init = get_component('light.{}'.format(light_type))
@ -174,34 +203,6 @@ def setup(hass, config):
light.entity_id = entity_id
ent_to_light[entity_id] = light
# Load built-in profiles and custom profiles
profile_paths = [os.path.join(os.path.dirname(__file__),
LIGHT_PROFILES_FILE),
hass.get_config_path(LIGHT_PROFILES_FILE)]
profiles = {}
for profile_path in profile_paths:
if os.path.isfile(profile_path):
with open(profile_path) as inp:
reader = csv.reader(inp)
# Skip the header
next(reader, None)
try:
for profile_id, color_x, color_y, brightness in reader:
profiles[profile_id] = (float(color_x), float(color_y),
int(brightness))
except ValueError:
# ValueError if not 4 values per row
# ValueError if convert to float/int failed
_LOGGER.error(
"Error parsing light profiles from %s", profile_path)
return False
# pylint: disable=unused-argument
def update_lights_state(now):
""" Update the states of all the lights. """
@ -227,11 +228,17 @@ def setup(hass, config):
if not lights:
lights = list(ent_to_light.values())
params = {}
transition = util.convert(dat.get(ATTR_TRANSITION), int)
if transition is not None:
params[ATTR_TRANSITION] = transition
if service.service == SERVICE_TURN_OFF:
for light in lights:
light.turn_off(transition=transition)
# pylint: disable=star-args
light.turn_off(**params)
else:
# Processing extra data for turn light on request
@ -242,20 +249,23 @@ def setup(hass, config):
profile = profiles.get(dat.get(ATTR_PROFILE))
if profile:
*color, bright = profile
else:
color, bright = None, None
# *color, bright = profile
*params[ATTR_XY_COLOR], params[ATTR_BRIGHTNESS] = profile
if ATTR_BRIGHTNESS in dat:
bright = util.convert(dat.get(ATTR_BRIGHTNESS), int)
# We pass in the old value as the default parameter if parsing
# of the new one goes wrong.
params[ATTR_BRIGHTNESS] = util.convert(
dat.get(ATTR_BRIGHTNESS), int, params.get(ATTR_BRIGHTNESS))
if ATTR_XY_COLOR in dat:
try:
# xy_color should be a list containing 2 floats
xy_color = dat.get(ATTR_XY_COLOR)
xycolor = dat.get(ATTR_XY_COLOR)
if len(xy_color) == 2:
color = [float(val) for val in xy_color]
# Without this check, a xycolor with value '99' would work
if not isinstance(xycolor, str):
params[ATTR_XY_COLOR] = [float(val) for val in xycolor]
except (TypeError, ValueError):
# TypeError if xy_color is not iterable
@ -268,9 +278,10 @@ def setup(hass, config):
rgb_color = dat.get(ATTR_RGB_COLOR)
if len(rgb_color) == 3:
color = util.color_RGB_to_xy(int(rgb_color[0]),
int(rgb_color[1]),
int(rgb_color[2]))
params[ATTR_XY_COLOR] = \
util.color_RGB_to_xy(int(rgb_color[0]),
int(rgb_color[1]),
int(rgb_color[2]))
except (TypeError, ValueError):
# TypeError if rgb_color is not iterable
@ -278,8 +289,8 @@ def setup(hass, config):
pass
for light in lights:
light.turn_on(transition=transition, brightness=bright,
xy_color=color)
# pylint: disable=star-args
light.turn_on(**params)
for light in lights:
light.update_ha_state(hass, True)

View file

@ -17,9 +17,7 @@ def get_lights(hass, config):
""" Gets the Hue lights. """
logger = logging.getLogger(__name__)
try:
# Pylint does not play nice if not every folders has an __init__.py
# pylint: disable=no-name-in-module, import-error
import homeassistant.external.phue.phue as phue
import phue
except ImportError:
logger.exception("Error while importing dependency phue.")
@ -100,16 +98,16 @@ class HueLight(ToggleDevice):
""" Turn the specified or all lights on. """
command = {'on': True}
if kwargs.get(ATTR_TRANSITION) is not None:
if ATTR_TRANSITION in kwargs:
# Transition time is in 1/10th seconds and cannot exceed
# 900 seconds.
command['transitiontime'] = min(9000, kwargs['transition'] * 10)
command['transitiontime'] = min(9000, kwargs[ATTR_TRANSITION] * 10)
if kwargs.get(ATTR_BRIGHTNESS) is not None:
command['bri'] = kwargs['brightness']
if ATTR_BRIGHTNESS in kwargs:
command['bri'] = kwargs[ATTR_BRIGHTNESS]
if kwargs.get(ATTR_XY_COLOR) is not None:
command['xy'] = kwargs['xy_color']
if ATTR_XY_COLOR in kwargs:
command['xy'] = kwargs[ATTR_XY_COLOR]
self.bridge.set_light(self.light_id, command)
@ -117,10 +115,10 @@ class HueLight(ToggleDevice):
""" Turn the specified or all lights off. """
command = {'on': False}
if kwargs.get('transition') is not None:
if ATTR_TRANSITION in kwargs:
# Transition time is in 1/10th seconds and cannot exceed
# 900 seconds.
command['transitiontime'] = min(9000, kwargs['transition'] * 10)
command['transitiontime'] = min(9000, kwargs[ATTR_TRANSITION] * 10)
self.bridge.set_light(self.light_id, command)

View file

@ -5,7 +5,7 @@ homeassistant.components.sun
Provides functionality to keep track of the sun.
"""
import logging
from datetime import timedelta
from datetime import datetime, timedelta
import homeassistant as ha
import homeassistant.util as util
@ -28,8 +28,10 @@ def is_on(hass, entity_id=None):
return hass.states.is_state(entity_id, STATE_ABOVE_HORIZON)
def next_setting(hass):
def next_setting(hass, entity_id=None):
""" Returns the datetime object representing the next sun setting. """
entity_id = entity_id or ENTITY_ID
state = hass.states.get(ENTITY_ID)
try:
@ -40,8 +42,10 @@ def next_setting(hass):
return None
def next_rising(hass):
def next_rising(hass, entity_id=None):
""" Returns the datetime object representing the next sun rising. """
entity_id = entity_id or ENTITY_ID
state = hass.states.get(ENTITY_ID)
try:
@ -73,15 +77,39 @@ def setup(hass, config):
latitude = config[ha.DOMAIN][ha.CONF_LATITUDE]
longitude = config[ha.DOMAIN][ha.CONF_LONGITUDE]
def update_sun_state(now): # pylint: disable=unused-argument
# Validate latitude and longitude
observer = ephem.Observer()
errors = []
try:
observer.lat = latitude # pylint: disable=assigning-non-slot
except ValueError:
errors.append("invalid value for latitude given: {}".format(latitude))
try:
observer.long = longitude # pylint: disable=assigning-non-slot
except ValueError:
errors.append("invalid value for latitude given: {}".format(latitude))
if errors:
logger.error("Error setting up: %s", ", ".join(errors))
return False
def update_sun_state(now):
""" Method to update the current state of the sun and
set time of next setting and rising. """
utc_offset = datetime.utcnow() - datetime.now()
utc_now = now + utc_offset
observer = ephem.Observer()
observer.lat = latitude # pylint: disable=assigning-non-slot
observer.long = longitude # pylint: disable=assigning-non-slot
next_rising_dt = ephem.localtime(observer.next_rising(sun))
next_setting_dt = ephem.localtime(observer.next_setting(sun))
next_rising_dt = ephem.localtime(
observer.next_rising(sun, start=utc_now))
next_setting_dt = ephem.localtime(
observer.next_setting(sun, start=utc_now))
if next_rising_dt > next_setting_dt:
new_state = STATE_ABOVE_HORIZON
@ -101,10 +129,10 @@ def setup(hass, config):
hass.states.set(ENTITY_ID, new_state, state_attributes)
# +10 seconds to be sure that the change has occured
# +1 second so Ephem will report it has set
hass.track_point_in_time(update_sun_state,
next_change + timedelta(seconds=10))
next_change + timedelta(seconds=1))
update_sun_state(None)
update_sun_state(datetime.now())
return True

View file

@ -11,7 +11,7 @@ import homeassistant.util as util
from homeassistant.loader import get_component
from homeassistant.components import (
group, extract_entity_ids, STATE_ON,
SERVICE_TURN_ON, SERVICE_TURN_OFF, ATTR_ENTITY_ID, ATTR_FRIENDLY_NAME)
SERVICE_TURN_ON, SERVICE_TURN_OFF, ATTR_ENTITY_ID)
DOMAIN = 'switch'
DEPENDENCIES = []
@ -129,7 +129,7 @@ def setup(hass, config):
switch.update_ha_state(hass)
# Track all wemos in a group
# Track all switches in a group
group.setup_group(hass, GROUP_NAME_ALL_SWITCHES,
ent_to_switch.keys(), False)

View file

@ -1,11 +0,0 @@
"""
Not all external Git repositories that we depend on are
available as a package for pip. That is why we include
them here.
PyNetgear
------------
https://github.com/balloob/pynetgear
"""

@ -1 +0,0 @@
Subproject commit c96d8d5dbe08adfe3919734c1c8403cd7ec4873e

View file

@ -3,7 +3,17 @@ homeassistant.loader
~~~~~~~~~~~~~~~~~~~~
Provides methods for loading Home Assistant components.
This module has quite some complex parts. I have tried to add as much
documentation as possible to keep it understandable.
Components are loaded by calling get_component('switch') from your code.
If you want to retrieve a platform that is part of a component, you should
call get_component('switch.your_platform'). In both cases the config directory
is checked to see if it contains a user provided version. If not available it
will check the built-in components and platforms.
"""
import os
import sys
import pkgutil
import importlib
@ -30,22 +40,32 @@ def prepare(hass):
pkgutil.iter_modules(components.__path__, 'homeassistant.components.'))
# Look for available custom components
custom_path = hass.get_config_path("custom_components")
# Ensure we can load custom components from the config dir
sys.path.append(hass.config_dir)
if os.path.isdir(custom_path):
# Ensure we can load custom components using Pythons import
sys.path.insert(0, hass.config_dir)
try:
# pylint: disable=import-error
import custom_components
# We cannot use the same approach as for built-in components because
# custom components might only contain a platform for a component.
# ie custom_components/switch/some_platform.py. Using pkgutil would
# not give us the switch component (and neither should it).
AVAILABLE_COMPONENTS.extend(
item[1] for item in
pkgutil.iter_modules(
custom_components.__path__, 'custom_components.'))
# Assumption: the custom_components dir only contains directories or
# python components. If this assumption is not true, HA won't break,
# just might output more errors.
for fil in os.listdir(custom_path):
if os.path.isdir(os.path.join(custom_path, fil)):
AVAILABLE_COMPONENTS.append('custom_components.{}'.format(fil))
except ImportError:
# No folder custom_components exist in the config directory
pass
else:
AVAILABLE_COMPONENTS.append(
'custom_components.{}'.format(fil[0:-3]))
def set_component(comp_name, component):
""" Sets a component in the cache. """
_COMPONENT_CACHE[comp_name] = component
def get_component(comp_name):
@ -61,8 +81,10 @@ def get_component(comp_name):
# an exception because it will try to import the parent.
# Because of this behavior, we will approach loading sub components
# with caution: only load it if we can verify that the parent exists.
# We do not want to silent the ImportErrors as they provide valuable
# information to track down when debugging Home Assistant.
# First check config dir, then built-in
# First check custom, then built-in
potential_paths = ['custom_components.{}'.format(comp_name),
'homeassistant.components.{}'.format(comp_name)]
@ -76,17 +98,30 @@ def get_component(comp_name):
continue
try:
_COMPONENT_CACHE[comp_name] = importlib.import_module(path)
module = importlib.import_module(path)
# In Python 3 you can import files from directories that do not
# contain the file __init__.py. A directory is a valid module if
# it contains a file with the .py extension. In this case Python
# will succeed in importing the directory as a module and call it
# a namespace. We do not care about namespaces.
# This prevents that when only
# custom_components/switch/some_platform.py exists,
# the import custom_components.switch would succeeed.
if module.__spec__.origin == 'namespace':
continue
_LOGGER.info("Loaded %s from %s", comp_name, path)
return _COMPONENT_CACHE[comp_name]
_COMPONENT_CACHE[comp_name] = module
return module
except ImportError:
_LOGGER.exception(
("Error loading %s. Make sure all "
"dependencies are installed"), path)
# We did find components but were unable to load them
_LOGGER.error("Unable to load component %s", comp_name)
_LOGGER.error("Unable to find component %s", comp_name)
return None

View file

@ -38,9 +38,9 @@ METHOD_POST = "post"
_LOGGER = logging.getLogger(__name__)
# pylint: disable=no-init, invalid-name
class APIStatus(enum.Enum):
""" Represents API status. """
# pylint: disable=no-init,invalid-name,too-few-public-methods
OK = "ok"
INVALID_PASSWORD = "invalid_password"
@ -134,9 +134,22 @@ class HomeAssistant(ha.HomeAssistant):
self.bus.fire(ha.EVENT_HOMEASSISTANT_START,
origin=ha.EventOrigin.remote)
def stop(self):
""" Stops Home Assistant and shuts down all threads. """
_LOGGER.info("Stopping")
self.bus.fire(ha.EVENT_HOMEASSISTANT_STOP,
origin=ha.EventOrigin.remote)
# Wait till all responses to homeassistant_stop are done
self._pool.block_till_done()
self._pool.stop()
class EventBus(ha.EventBus):
""" EventBus implementation that forwards fire_event to remote API. """
# pylint: disable=too-few-public-methods
def __init__(self, api, pool=None):
super().__init__(pool)
@ -240,10 +253,11 @@ class StateMachine(ha.StateMachine):
class JSONEncoder(json.JSONEncoder):
""" JSONEncoder that supports Home Assistant objects. """
# pylint: disable=too-few-public-methods,method-hidden
def default(self, obj): # pylint: disable=method-hidden
""" Checks if Home Assistat object and encodes if possible.
Else hand it off to original method. """
def default(self, obj):
""" Converts Home Assistant objects and hands
other objects to the original method. """
if isinstance(obj, ha.State):
return obj.as_dict()
@ -362,7 +376,10 @@ def get_states(api):
def set_state(api, entity_id, new_state, attributes=None):
""" Tells API to update state for entity_id. """
"""
Tells API to update state for entity_id.
Returns True if success.
"""
attributes = attributes or {}
@ -374,13 +391,18 @@ def set_state(api, entity_id, new_state, attributes=None):
URL_API_STATES_ENTITY.format(entity_id),
data)
if req.status_code != 201:
if req.status_code not in (200, 201):
_LOGGER.error("Error changing state: %d - %s",
req.status_code, req.text)
return False
else:
return True
except ha.HomeAssistantError:
_LOGGER.exception("Error setting state")
return False
def is_state(api, entity_id, state):
""" Queries API to see if entity_id is specified state. """

View file

@ -1,433 +0,0 @@
"""
homeassistant.test
~~~~~~~~~~~~~~~~~~
Provides tests to verify that Home Assistant modules do what they should do.
"""
import unittest
import time
import json
import requests
import homeassistant as ha
import homeassistant.remote as remote
import homeassistant.components.http as http
API_PASSWORD = "test1234"
HTTP_BASE_URL = "http://127.0.0.1:{}".format(remote.SERVER_PORT)
HA_HEADERS = {remote.AUTH_HEADER: API_PASSWORD}
def _url(path=""):
""" Helper method to generate urls. """
return HTTP_BASE_URL + path
class HAHelper(object): # pylint: disable=too-few-public-methods
""" Helper class to keep track of current running HA instance. """
hass = None
slave = None
def ensure_homeassistant_started():
""" Ensures home assistant is started. """
if not HAHelper.hass:
hass = ha.HomeAssistant()
hass.bus.listen('test_event', lambda _: _)
hass.states.set('test.test', 'a_state')
http.setup(hass,
{http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD}})
hass.start()
# Give objects time to startup
time.sleep(1)
HAHelper.hass = hass
return HAHelper.hass
def ensure_slave_started():
""" Ensure a home assistant slave is started. """
ensure_homeassistant_started()
if not HAHelper.slave:
local_api = remote.API("127.0.0.1", API_PASSWORD, 8124)
remote_api = remote.API("127.0.0.1", API_PASSWORD)
slave = remote.HomeAssistant(remote_api, local_api)
http.setup(slave,
{http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD,
http.CONF_SERVER_PORT: 8124}})
slave.start()
# Give objects time to startup
time.sleep(1)
HAHelper.slave = slave
return HAHelper.slave
# pylint: disable=too-many-public-methods
class TestHTTP(unittest.TestCase):
""" Test the HTTP debug interface and API. """
@classmethod
def setUpClass(cls): # pylint: disable=invalid-name
""" things to be run when tests are started. """
cls.hass = ensure_homeassistant_started()
def test_api_password(self):
""" Test if we get access denied if we omit or provide
a wrong api password. """
req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("test")))
self.assertEqual(req.status_code, 401)
req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("test")),
headers={remote.AUTH_HEADER: 'wrongpassword'})
self.assertEqual(req.status_code, 401)
def test_api_list_state_entities(self):
""" Test if the debug interface allows us to list state entities. """
req = requests.get(_url(remote.URL_API_STATES),
headers=HA_HEADERS)
remote_data = [ha.State.from_dict(item) for item in req.json()]
self.assertEqual(self.hass.states.all(), remote_data)
def test_api_get_state(self):
""" Test if the debug interface allows us to get a state. """
req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("test.test")),
headers=HA_HEADERS)
data = ha.State.from_dict(req.json())
state = self.hass.states.get("test.test")
self.assertEqual(data.state, state.state)
self.assertEqual(data.last_changed, state.last_changed)
self.assertEqual(data.attributes, state.attributes)
def test_api_get_non_existing_state(self):
""" Test if the debug interface allows us to get a state. """
req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("does_not_exist")),
headers=HA_HEADERS)
self.assertEqual(req.status_code, 404)
def test_api_state_change(self):
""" Test if we can change the state of an entity that exists. """
self.hass.states.set("test.test", "not_to_be_set")
requests.post(_url(remote.URL_API_STATES_ENTITY.format("test.test")),
data=json.dumps({"state": "debug_state_change2",
"api_password": API_PASSWORD}))
self.assertEqual(self.hass.states.get("test.test").state,
"debug_state_change2")
# pylint: disable=invalid-name
def test_api_state_change_of_non_existing_entity(self):
""" Test if the API allows us to change a state of
a non existing entity. """
new_state = "debug_state_change"
req = requests.post(
_url(remote.URL_API_STATES_ENTITY.format(
"test_entity.that_does_not_exist")),
data=json.dumps({"state": new_state,
"api_password": API_PASSWORD}))
cur_state = (self.hass.states.
get("test_entity.that_does_not_exist").state)
self.assertEqual(req.status_code, 201)
self.assertEqual(cur_state, new_state)
# pylint: disable=invalid-name
def test_api_fire_event_with_no_data(self):
""" Test if the API allows us to fire an event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
self.hass.listen_once_event("test.event_no_data", listener)
requests.post(
_url(remote.URL_API_EVENTS_EVENT.format("test.event_no_data")),
headers=HA_HEADERS)
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
# pylint: disable=invalid-name
def test_api_fire_event_with_data(self):
""" Test if the API allows us to fire an event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify that our event got called and
that test if our data came through. """
if "test" in event.data:
test_value.append(1)
self.hass.listen_once_event("test_event_with_data", listener)
requests.post(
_url(remote.URL_API_EVENTS_EVENT.format("test_event_with_data")),
data=json.dumps({"test": 1}),
headers=HA_HEADERS)
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
# pylint: disable=invalid-name
def test_api_fire_event_with_invalid_json(self):
""" Test if the API allows us to fire an event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
self.hass.listen_once_event("test_event_with_bad_data", listener)
req = requests.post(
_url(remote.URL_API_EVENTS_EVENT.format("test_event")),
data=json.dumps('not an object'),
headers=HA_HEADERS)
# It shouldn't but if it fires, allow the event to take place
time.sleep(1)
self.assertEqual(req.status_code, 422)
self.assertEqual(len(test_value), 0)
def test_api_get_event_listeners(self):
""" Test if we can get the list of events being listened for. """
req = requests.get(_url(remote.URL_API_EVENTS),
headers=HA_HEADERS)
local = self.hass.bus.listeners
for event in req.json():
self.assertEqual(event["listener_count"],
local.pop(event["event"]))
self.assertEqual(len(local), 0)
def test_api_get_services(self):
""" Test if we can get a dict describing current services. """
req = requests.get(_url(remote.URL_API_SERVICES),
headers=HA_HEADERS)
local_services = self.hass.services.services
for serv_domain in req.json():
local = local_services.pop(serv_domain["domain"])
self.assertEqual(serv_domain["services"], local)
def test_api_call_service_no_data(self):
""" Test if the API allows us to call a service. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify that our service got called. """
test_value.append(1)
self.hass.services.register("test_domain", "test_service", listener)
requests.post(
_url(remote.URL_API_SERVICES_SERVICE.format(
"test_domain", "test_service")),
headers=HA_HEADERS)
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
def test_api_call_service_with_data(self):
""" Test if the API allows us to call a service. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify that our service got called and
that test if our data came through. """
if "test" in service_call.data:
test_value.append(1)
self.hass.services.register("test_domain", "test_service", listener)
requests.post(
_url(remote.URL_API_SERVICES_SERVICE.format(
"test_domain", "test_service")),
data=json.dumps({"test": 1}),
headers=HA_HEADERS)
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
class TestRemoteMethods(unittest.TestCase):
""" Test the homeassistant.remote module. """
@classmethod
def setUpClass(cls): # pylint: disable=invalid-name
""" things to be run when tests are started. """
cls.hass = ensure_homeassistant_started()
cls.api = remote.API("127.0.0.1", API_PASSWORD)
def test_get_event_listeners(self):
""" Test Python API get_event_listeners. """
local = self.hass.bus.listeners
for event in remote.get_event_listeners(self.api):
self.assertEqual(event["listener_count"],
local.pop(event["event"]))
self.assertEqual(len(local), 0)
def test_fire_event(self):
""" Test Python API fire_event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
self.hass.listen_once_event("test.event_no_data", listener)
remote.fire_event(self.api, "test.event_no_data")
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
def test_get_state(self):
""" Test Python API get_state. """
self.assertEqual(
remote.get_state(self.api, 'test.test'),
self.hass.states.get('test.test'))
def test_get_states(self):
""" Test Python API get_state_entity_ids. """
self.assertEqual(
remote.get_states(self.api), self.hass.states.all())
def test_set_state(self):
""" Test Python API set_state. """
remote.set_state(self.api, 'test.test', 'set_test')
self.assertEqual(self.hass.states.get('test.test').state, 'set_test')
def test_is_state(self):
""" Test Python API is_state. """
self.assertTrue(
remote.is_state(self.api, 'test.test',
self.hass.states.get('test.test').state))
def test_get_services(self):
""" Test Python API get_services. """
local_services = self.hass.services.services
for serv_domain in remote.get_services(self.api):
local = local_services.pop(serv_domain["domain"])
self.assertEqual(serv_domain["services"], local)
def test_call_service(self):
""" Test Python API call_service. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify that our service got called. """
test_value.append(1)
self.hass.services.register("test_domain", "test_service", listener)
remote.call_service(self.api, "test_domain", "test_service")
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
class TestRemoteClasses(unittest.TestCase):
""" Test the homeassistant.remote module. """
@classmethod
def setUpClass(cls): # pylint: disable=invalid-name
""" things to be run when tests are started. """
cls.hass = ensure_homeassistant_started()
cls.slave = ensure_slave_started()
def test_statemachine_init(self):
""" Tests if remote.StateMachine copies all states on init. """
for state in self.hass.states.all():
self.assertEqual(
self.slave.states.get(state.entity_id), state)
def test_statemachine_set(self):
""" Tests if setting the state on a slave is recorded. """
self.slave.states.set("remote.test", "remote.statemachine test")
# Allow interaction between 2 instances
time.sleep(1)
self.assertEqual(self.slave.states.get("remote.test").state,
"remote.statemachine test")
def test_eventbus_fire(self):
""" Test if events fired from the eventbus get fired. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
self.slave.listen_once_event("test.event_no_data", listener)
self.slave.bus.fire("test.event_no_data")
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)

View file

@ -24,13 +24,13 @@ def sanitize_filename(filename):
def sanitize_path(path):
""" Sanitizes a path by removing .. / and \\. """
""" Sanitizes a path by removing ~ and .. """
return RE_SANITIZE_PATH.sub("", path)
def slugify(text):
""" Slugifies a given text. """
text = text.strip().replace(" ", "_")
text = text.replace(" ", "_")
return RE_SLUGIFY.sub("", text)
@ -76,6 +76,9 @@ def repr_helper(inp):
# pylint: disable=invalid-name
def color_RGB_to_xy(R, G, B):
""" Convert from RGB color to XY color. """
if R + G + B == 0:
return 0, 0
var_R = (R / 255.)
var_G = (G / 255.)
var_B = (B / 255.)
@ -124,7 +127,7 @@ def ensure_unique_string(preferred_string, current_strings):
tries = 1
while preferred_string in current_strings:
while string in current_strings:
tries += 1
string = "{}_{}".format(preferred_string, tries)
@ -150,7 +153,7 @@ def get_local_ip():
class OrderedEnum(enum.Enum):
""" Taken from Python 3.4.0 docs. """
# pylint: disable=no-init
# pylint: disable=no-init, too-few-public-methods
def __ge__(self, other):
if self.__class__ is other.__class__:
@ -185,6 +188,8 @@ def validate_config(config, items, logger):
"""
errors_found = False
for domain in items.keys():
config.setdefault(domain, {})
errors = [item for item in items[domain] if item not in config[domain]]
if errors:
@ -212,8 +217,8 @@ class ThreadPool(object):
""" A simple queue-based thread pool.
Will initiate it's workers using worker(queue).start() """
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-few-public-methods
def __init__(self, worker_count, job_handler, busy_callback=None):
"""
worker_count: number of threads to run that handle jobs
@ -221,30 +226,62 @@ class ThreadPool(object):
busy_callback: method to be called when queue gets too big.
Parameters: list_of_current_jobs, number_pending_jobs
"""
work_queue = self.work_queue = queue.PriorityQueue()
current_jobs = self.current_jobs = []
self.work_queue = work_queue = queue.PriorityQueue()
self.current_jobs = current_jobs = []
self.worker_count = worker_count
self.busy_callback = busy_callback
self.busy_warning_limit = worker_count**2
self._lock = threading.RLock()
self._quit_task = object()
for _ in range(worker_count):
worker = threading.Thread(target=_threadpool_worker,
args=(work_queue, current_jobs,
job_handler))
job_handler, self._quit_task))
worker.daemon = True
worker.start()
self.running = True
def add_job(self, priority, job):
""" Add a job to be sent to the workers. """
self.work_queue.put(PriorityQueueItem(priority, job))
with self._lock:
if not self.running:
raise Exception("We are shutting down the ")
# check if our queue is getting too big
if self.work_queue.qsize() > self.busy_warning_limit \
and self.busy_callback is not None:
self.work_queue.put(PriorityQueueItem(priority, job))
# Increase limit we will issue next warning
self.busy_warning_limit *= 2
# check if our queue is getting too big
if self.work_queue.qsize() > self.busy_warning_limit \
and self.busy_callback is not None:
self.busy_callback(self.current_jobs, self.work_queue.qsize())
# Increase limit we will issue next warning
self.busy_warning_limit *= 2
self.busy_callback(self.current_jobs, self.work_queue.qsize())
def block_till_done(self):
""" Blocks till all work is done. """
self.work_queue.join()
def stop(self):
""" Stops all the threads. """
with self._lock:
if not self.running:
return
# Clear the queue
while self.work_queue.qsize() > 0:
self.work_queue.get()
self.work_queue.task_done()
# Tell the workers to quit
for _ in range(self.worker_count):
self.add_job(1000, self._quit_task)
self.running = False
self.block_till_done()
class PriorityQueueItem(object):
@ -259,12 +296,16 @@ class PriorityQueueItem(object):
return self.priority < other.priority
def _threadpool_worker(work_queue, current_jobs, job_handler):
def _threadpool_worker(work_queue, current_jobs, job_handler, quit_task):
""" Provides the base functionality of a worker for the thread pool. """
while True:
# Get new item from work_queue
job = work_queue.get().item
if job == quit_task:
work_queue.task_done()
return
# Add to current running jobs
job_log = (datetime.datetime.now(), job)
current_jobs.append(job_log)

View file

@ -6,7 +6,12 @@ reports=no
# locally-disabled - it spams too much
# duplicate-code - unavoidable
# cyclic-import - doesn't test if both import on load
disable=locally-disabled,duplicate-code,cyclic-import
# file-ignored - we ignore a file to work around a pylint bug
disable=
locally-disabled,
duplicate-code,
cyclic-import,
file-ignored
[EXCEPTIONS]
overgeneral-exceptions=Exception,HomeAssistantError

View file

@ -1,5 +1,19 @@
# required
requests>=2.0
pychromecast>=0.5
# optional, needed for specific components
# sun
pyephem>=3.7
# lights.hue
phue>=0.8
# chromecast
pychromecast>=0.5
# keyboard
pyuserinput>=0.1.9
# switch.tellstick, tellstick_sensor
tellcore-py>=1.0.4

5
run_tests.sh Executable file
View file

@ -0,0 +1,5 @@
#!/bin/bash
pylint homeassistant
flake8 homeassistant --exclude bower_components,external
python3 -m unittest discover test

View file

@ -0,0 +1,3 @@
"""
Module to be loaded by the Loader test.
"""

30
test/helper.py Normal file
View file

@ -0,0 +1,30 @@
"""
test.helper
~~~~~~~~~~~
Helper method for writing tests.
"""
import os
import homeassistant as ha
def get_test_home_assistant():
""" Returns a Home Assistant object pointing at test config dir. """
hass = ha.HomeAssistant()
hass.config_dir = os.path.join(os.path.dirname(__file__), "config")
return hass
def mock_service(hass, domain, service):
"""
Sets up a fake service.
Returns a list that logs all calls to fake service.
"""
calls = []
hass.services.register(
domain, service, lambda call: calls.append(call))
return calls

View file

@ -0,0 +1,64 @@
"""
test.mock.switch_platform
~~~~~~~~~~~~~~~~~~~~~~~~~
Provides a mock switch platform.
Call init before using it in your tests to ensure clean test data.
"""
import homeassistant.components as components
class MockToggleDevice(components.ToggleDevice):
""" Fake switch. """
def __init__(self, name, state):
self.name = name
self.state = state
self.calls = []
def get_name(self):
""" Returns the name of the device if any. """
self.calls.append(('get_name', {}))
return self.name
def turn_on(self, **kwargs):
""" Turn the device on. """
self.calls.append(('turn_on', kwargs))
self.state = components.STATE_ON
def turn_off(self, **kwargs):
""" Turn the device off. """
self.calls.append(('turn_off', kwargs))
self.state = components.STATE_OFF
def is_on(self):
""" True if device is on. """
self.calls.append(('is_on', {}))
return self.state == components.STATE_ON
def last_call(self, method=None):
if method is None:
return self.calls[-1]
else:
return next(call for call in reversed(self.calls)
if call[0] == method)
DEVICES = []
def init(empty=False):
""" (re-)initalizes the platform with devices. """
global DEVICES
DEVICES = [] if empty else [
MockToggleDevice('AC', components.STATE_ON),
MockToggleDevice('AC', components.STATE_OFF),
MockToggleDevice(None, components.STATE_OFF)
]
def get_switches(hass, config):
""" Returns mock devices. """
return DEVICES
get_lights = get_switches

View file

@ -0,0 +1,87 @@
"""
test.test_component_chromecast
~~~~~~~~~~~
Tests Chromecast component.
"""
# pylint: disable=too-many-public-methods,protected-access
import logging
import unittest
import homeassistant as ha
import homeassistant.components as components
import homeassistant.components.chromecast as chromecast
from helper import mock_service
def setUpModule(): # pylint: disable=invalid-name
""" Setup to ignore chromecast errors. """
logging.disable(logging.CRITICAL)
class TestChromecast(unittest.TestCase):
""" Test the chromecast module. """
def setUp(self): # pylint: disable=invalid-name
self.hass = ha.HomeAssistant()
self.test_entity = chromecast.ENTITY_ID_FORMAT.format('living_room')
self.hass.states.set(self.test_entity, chromecast.STATE_NO_APP)
self.test_entity2 = chromecast.ENTITY_ID_FORMAT.format('bedroom')
self.hass.states.set(self.test_entity2, "Youtube")
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass._pool.stop()
def test_is_on(self):
""" Test is_on method. """
self.assertFalse(chromecast.is_on(self.hass, self.test_entity))
self.assertTrue(chromecast.is_on(self.hass, self.test_entity2))
def test_services(self):
"""
Test if the call service methods conver to correct service calls.
"""
services = {
components.SERVICE_TURN_OFF: chromecast.turn_off,
components.SERVICE_VOLUME_UP: chromecast.volume_up,
components.SERVICE_VOLUME_DOWN: chromecast.volume_down,
components.SERVICE_MEDIA_PLAY_PAUSE: chromecast.media_play_pause,
components.SERVICE_MEDIA_PLAY: chromecast.media_play,
components.SERVICE_MEDIA_PAUSE: chromecast.media_pause,
components.SERVICE_MEDIA_NEXT_TRACK: chromecast.media_next_track,
components.SERVICE_MEDIA_PREV_TRACK: chromecast.media_prev_track
}
for service_name, service_method in services.items():
calls = mock_service(self.hass, chromecast.DOMAIN, service_name)
service_method(self.hass)
self.hass._pool.block_till_done()
self.assertEqual(1, len(calls))
call = calls[-1]
self.assertEqual(call.domain, chromecast.DOMAIN)
self.assertEqual(call.service, service_name)
self.assertEqual(call.data, {})
service_method(self.hass, self.test_entity)
self.hass._pool.block_till_done()
self.assertEqual(2, len(calls))
call = calls[-1]
self.assertEqual(call.domain, chromecast.DOMAIN)
self.assertEqual(call.service, service_name)
self.assertEqual(call.data,
{components.ATTR_ENTITY_ID: self.test_entity})
def test_setup(self):
"""
Test Chromecast setup.
We do not have access to a Chromecast while testing so test errors.
In an ideal world we would create a mock pychromecast API..
"""
self.assertFalse(chromecast.setup(
self.hass, {chromecast.DOMAIN: {ha.CONF_HOSTS: '127.0.0.1'}}))

View file

@ -0,0 +1,73 @@
"""
test.test_component_core
~~~~~~~~~~~~~~~~~~~~~~~~
Tests core compoments.
"""
# pylint: disable=protected-access,too-many-public-methods
import unittest
import homeassistant as ha
import homeassistant.loader as loader
import homeassistant.components as comps
class TestComponentsCore(unittest.TestCase):
""" Tests homeassistant.components module. """
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = ha.HomeAssistant()
loader.prepare(self.hass)
self.assertTrue(comps.setup(self.hass, {}))
self.hass.states.set('light.Bowl', comps.STATE_ON)
self.hass.states.set('light.Ceiling', comps.STATE_OFF)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass._pool.stop()
def test_is_on(self):
""" Test is_on method. """
self.assertTrue(comps.is_on(self.hass, 'light.Bowl'))
self.assertFalse(comps.is_on(self.hass, 'light.Ceiling'))
self.assertTrue(comps.is_on(self.hass))
def test_turn_on(self):
""" Test turn_on method. """
runs = []
self.hass.services.register(
'light', comps.SERVICE_TURN_ON, lambda x: runs.append(1))
comps.turn_on(self.hass, 'light.Ceiling')
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
def test_turn_off(self):
""" Test turn_off method. """
runs = []
self.hass.services.register(
'light', comps.SERVICE_TURN_OFF, lambda x: runs.append(1))
comps.turn_off(self.hass, 'light.Bowl')
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
def test_extract_entity_ids(self):
""" Test extract_entity_ids method. """
call = ha.ServiceCall('light', 'turn_on',
{comps.ATTR_ENTITY_ID: 'light.Bowl'})
self.assertEqual(['light.Bowl'],
comps.extract_entity_ids(self.hass, call))
call = ha.ServiceCall('light', 'turn_on',
{comps.ATTR_ENTITY_ID: ['light.Bowl']})
self.assertEqual(['light.Bowl'],
comps.extract_entity_ids(self.hass, call))

View file

@ -0,0 +1,163 @@
"""
test.test_component_group
~~~~~~~~~~~~~~~~~~~~~~~~~
Tests the group compoments.
"""
# pylint: disable=protected-access,too-many-public-methods
import unittest
import logging
import homeassistant as ha
import homeassistant.components as comps
import homeassistant.components.group as group
def setUpModule(): # pylint: disable=invalid-name
""" Setup to ignore group errors. """
logging.disable(logging.CRITICAL)
class TestComponentsGroup(unittest.TestCase):
""" Tests homeassistant.components.group module. """
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = ha.HomeAssistant()
self.hass.states.set('light.Bowl', comps.STATE_ON)
self.hass.states.set('light.Ceiling', comps.STATE_OFF)
self.hass.states.set('switch.AC', comps.STATE_OFF)
group.setup_group(self.hass, 'init_group',
['light.Bowl', 'light.Ceiling'], False)
group.setup_group(self.hass, 'mixed_group',
['light.Bowl', 'switch.AC'], False)
self.group_name = group.ENTITY_ID_FORMAT.format('init_group')
self.mixed_group_name = group.ENTITY_ID_FORMAT.format('mixed_group')
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
def test_setup_and_monitor_group(self):
""" Test setup_group method. """
# Test if group setup in our init mode is ok
self.assertIn(self.group_name, self.hass.states.entity_ids)
group_state = self.hass.states.get(self.group_name)
self.assertEqual(comps.STATE_ON, group_state.state)
self.assertTrue(group_state.attributes[group.ATTR_AUTO])
# Turn the Bowl off and see if group turns off
self.hass.states.set('light.Bowl', comps.STATE_OFF)
self.hass._pool.block_till_done()
group_state = self.hass.states.get(self.group_name)
self.assertEqual(comps.STATE_OFF, group_state.state)
# Turn the Ceiling on and see if group turns on
self.hass.states.set('light.Ceiling', comps.STATE_ON)
self.hass._pool.block_till_done()
group_state = self.hass.states.get(self.group_name)
self.assertEqual(comps.STATE_ON, group_state.state)
# Try to setup a group with mixed groupable states
self.hass.states.set('device_tracker.Paulus', comps.STATE_HOME)
self.assertFalse(group.setup_group(
self.hass, 'person_and_light',
['light.Bowl', 'device_tracker.Paulus']))
# Try to setup a group with a non existing state
self.assertNotIn('non.existing', self.hass.states.entity_ids)
self.assertFalse(group.setup_group(
self.hass, 'light_and_nothing',
['light.Bowl', 'non.existing']))
# Try to setup a group with non groupable states
self.hass.states.set('cast.living_room', "Plex")
self.hass.states.set('cast.bedroom', "Netflix")
self.assertFalse(
group.setup_group(
self.hass, 'chromecasts',
['cast.living_room', 'cast.bedroom']))
# Try to setup an empty group
self.assertFalse(group.setup_group(self.hass, 'nothing', []))
def test__get_group_type(self):
""" Test _get_group_type method. """
self.assertEqual('on_off', group._get_group_type(comps.STATE_ON))
self.assertEqual('on_off', group._get_group_type(comps.STATE_OFF))
self.assertEqual('home_not_home',
group._get_group_type(comps.STATE_HOME))
self.assertEqual('home_not_home',
group._get_group_type(comps.STATE_NOT_HOME))
# Unsupported state
self.assertIsNone(group._get_group_type('unsupported_state'))
def test_is_on(self):
""" Test is_on method. """
self.assertTrue(group.is_on(self.hass, self.group_name))
self.hass.states.set('light.Bowl', comps.STATE_OFF)
self.hass._pool.block_till_done()
self.assertFalse(group.is_on(self.hass, self.group_name))
# Try on non existing state
self.assertFalse(group.is_on(self.hass, 'non.existing'))
def test_expand_entity_ids(self):
""" Test expand_entity_ids method. """
self.assertEqual(sorted(['light.Ceiling', 'light.Bowl']),
sorted(group.expand_entity_ids(
self.hass, [self.group_name])))
# Make sure that no duplicates are returned
self.assertEqual(
sorted(['light.Ceiling', 'light.Bowl']),
sorted(group.expand_entity_ids(
self.hass, [self.group_name, 'light.Ceiling'])))
# Test that non strings are ignored
self.assertEqual([], group.expand_entity_ids(self.hass, [5, True]))
def test_get_entity_ids(self):
""" Test get_entity_ids method. """
# Get entity IDs from our group
self.assertEqual(
sorted(['light.Ceiling', 'light.Bowl']),
sorted(group.get_entity_ids(self.hass, self.group_name)))
# Test domain_filter
self.assertEqual(
['switch.AC'],
group.get_entity_ids(
self.hass, self.mixed_group_name, domain_filter="switch"))
# Test with non existing group name
self.assertEqual([], group.get_entity_ids(self.hass, 'non_existing'))
# Test with non-group state
self.assertEqual([], group.get_entity_ids(self.hass, 'switch.AC'))
def test_setup(self):
""" Test setup method. """
self.assertTrue(
group.setup(
self.hass,
{
group.DOMAIN: {
'second_group': '{},light.Bowl'.format(self.group_name)
}
}))
group_state = self.hass.states.get(
group.ENTITY_ID_FORMAT.format('second_group'))
self.assertEqual(comps.STATE_ON, group_state.state)
self.assertFalse(group_state.attributes[group.ATTR_AUTO])

281
test/test_component_http.py Normal file
View file

@ -0,0 +1,281 @@
"""
test.test_component_http
~~~~~~~~~~~~~~~~~~~~~~~~
Tests Home Assistant HTTP component does what it should do.
"""
# pylint: disable=protected-access,too-many-public-methods
import re
import unittest
import json
import requests
import homeassistant as ha
import homeassistant.remote as remote
import homeassistant.components.http as http
API_PASSWORD = "test1234"
# Somehow the socket that holds the default port does not get released
# when we close down HA in a different test case. Until I have figured
# out what is going on, let's run this test on a different port.
SERVER_PORT = 8120
HTTP_BASE_URL = "http://127.0.0.1:{}".format(SERVER_PORT)
HA_HEADERS = {remote.AUTH_HEADER: API_PASSWORD}
hass = None
def _url(path=""):
""" Helper method to generate urls. """
return HTTP_BASE_URL + path
def setUpModule(): # pylint: disable=invalid-name
""" Initalizes a Home Assistant server. """
global hass
hass = ha.HomeAssistant()
hass.bus.listen('test_event', lambda _: _)
hass.states.set('test.test', 'a_state')
http.setup(hass,
{http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD,
http.CONF_SERVER_PORT: SERVER_PORT}})
hass.start()
def tearDownModule(): # pylint: disable=invalid-name
""" Stops the Home Assistant server. """
global hass
hass.stop()
class TestHTTP(unittest.TestCase):
""" Test the HTTP debug interface and API. """
def test_get_frontend(self):
""" Tests if we can get the frontend. """
req = requests.get(_url(""))
self.assertEqual(200, req.status_code)
frontendjs = re.search(
r'(?P<app>\/static\/frontend-[A-Za-z0-9]{32}.html)',
req.text).groups(0)[0]
self.assertIsNotNone(frontendjs)
req = requests.get(_url(frontendjs))
self.assertEqual(200, req.status_code)
def test_api_password(self):
""" Test if we get access denied if we omit or provide
a wrong api password. """
req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("test")))
self.assertEqual(401, req.status_code)
req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("test")),
headers={remote.AUTH_HEADER: 'wrongpassword'})
self.assertEqual(401, req.status_code)
def test_api_list_state_entities(self):
""" Test if the debug interface allows us to list state entities. """
req = requests.get(_url(remote.URL_API_STATES),
headers=HA_HEADERS)
remote_data = [ha.State.from_dict(item) for item in req.json()]
self.assertEqual(hass.states.all(), remote_data)
def test_api_get_state(self):
""" Test if the debug interface allows us to get a state. """
req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("test.test")),
headers=HA_HEADERS)
data = ha.State.from_dict(req.json())
state = hass.states.get("test.test")
self.assertEqual(state.state, data.state)
self.assertEqual(state.last_changed, data.last_changed)
self.assertEqual(state.attributes, data.attributes)
def test_api_get_non_existing_state(self):
""" Test if the debug interface allows us to get a state. """
req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("does_not_exist")),
headers=HA_HEADERS)
self.assertEqual(404, req.status_code)
def test_api_state_change(self):
""" Test if we can change the state of an entity that exists. """
hass.states.set("test.test", "not_to_be_set")
requests.post(_url(remote.URL_API_STATES_ENTITY.format("test.test")),
data=json.dumps({"state": "debug_state_change2",
"api_password": API_PASSWORD}))
self.assertEqual("debug_state_change2",
hass.states.get("test.test").state)
# pylint: disable=invalid-name
def test_api_state_change_of_non_existing_entity(self):
""" Test if the API allows us to change a state of
a non existing entity. """
new_state = "debug_state_change"
req = requests.post(
_url(remote.URL_API_STATES_ENTITY.format(
"test_entity.that_does_not_exist")),
data=json.dumps({"state": new_state,
"api_password": API_PASSWORD}))
cur_state = (hass.states.
get("test_entity.that_does_not_exist").state)
self.assertEqual(201, req.status_code)
self.assertEqual(cur_state, new_state)
# pylint: disable=invalid-name
def test_api_fire_event_with_no_data(self):
""" Test if the API allows us to fire an event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
hass.listen_once_event("test.event_no_data", listener)
requests.post(
_url(remote.URL_API_EVENTS_EVENT.format("test.event_no_data")),
headers=HA_HEADERS)
hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
# pylint: disable=invalid-name
def test_api_fire_event_with_data(self):
""" Test if the API allows us to fire an event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify that our event got called and
that test if our data came through. """
if "test" in event.data:
test_value.append(1)
hass.listen_once_event("test_event_with_data", listener)
requests.post(
_url(remote.URL_API_EVENTS_EVENT.format("test_event_with_data")),
data=json.dumps({"test": 1}),
headers=HA_HEADERS)
hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
# pylint: disable=invalid-name
def test_api_fire_event_with_invalid_json(self):
""" Test if the API allows us to fire an event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
hass.listen_once_event("test_event_bad_data", listener)
req = requests.post(
_url(remote.URL_API_EVENTS_EVENT.format("test_event_bad_data")),
data=json.dumps('not an object'),
headers=HA_HEADERS)
hass._pool.block_till_done()
self.assertEqual(422, req.status_code)
self.assertEqual(0, len(test_value))
def test_api_get_event_listeners(self):
""" Test if we can get the list of events being listened for. """
req = requests.get(_url(remote.URL_API_EVENTS),
headers=HA_HEADERS)
local = hass.bus.listeners
for event in req.json():
self.assertEqual(event["listener_count"],
local.pop(event["event"]))
self.assertEqual(0, len(local))
def test_api_get_services(self):
""" Test if we can get a dict describing current services. """
req = requests.get(_url(remote.URL_API_SERVICES),
headers=HA_HEADERS)
local_services = hass.services.services
for serv_domain in req.json():
local = local_services.pop(serv_domain["domain"])
self.assertEqual(local, serv_domain["services"])
def test_api_call_service_no_data(self):
""" Test if the API allows us to call a service. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify that our service got called. """
test_value.append(1)
hass.services.register("test_domain", "test_service", listener)
requests.post(
_url(remote.URL_API_SERVICES_SERVICE.format(
"test_domain", "test_service")),
headers=HA_HEADERS)
hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
def test_api_call_service_with_data(self):
""" Test if the API allows us to call a service. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify that our service got called and
that test if our data came through. """
if "test" in service_call.data:
test_value.append(1)
hass.services.register("test_domain", "test_service", listener)
requests.post(
_url(remote.URL_API_SERVICES_SERVICE.format(
"test_domain", "test_service")),
data=json.dumps({"test": 1}),
headers=HA_HEADERS)
hass._pool.block_till_done()
self.assertEqual(1, len(test_value))

View file

@ -0,0 +1,272 @@
"""
test.test_component_switch
~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests switch component.
"""
# pylint: disable=too-many-public-methods,protected-access
import unittest
import os
import homeassistant as ha
import homeassistant.loader as loader
import homeassistant.util as util
import homeassistant.components as components
import homeassistant.components.light as light
import mock_toggledevice_platform
from helper import mock_service, get_test_home_assistant
class TestLight(unittest.TestCase):
""" Test the switch module. """
def setUp(self): # pylint: disable=invalid-name
self.hass = get_test_home_assistant()
loader.prepare(self.hass)
loader.set_component('light.test', mock_toggledevice_platform)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass._pool.stop()
user_light_file = self.hass.get_config_path(light.LIGHT_PROFILES_FILE)
if os.path.isfile(user_light_file):
os.remove(user_light_file)
def test_methods(self):
""" Test if methods call the services as expected. """
# Test is_on
self.hass.states.set('light.test', components.STATE_ON)
self.assertTrue(light.is_on(self.hass, 'light.test'))
self.hass.states.set('light.test', components.STATE_OFF)
self.assertFalse(light.is_on(self.hass, 'light.test'))
self.hass.states.set(light.ENTITY_ID_ALL_LIGHTS, components.STATE_ON)
self.assertTrue(light.is_on(self.hass))
self.hass.states.set(light.ENTITY_ID_ALL_LIGHTS, components.STATE_OFF)
self.assertFalse(light.is_on(self.hass))
# Test turn_on
turn_on_calls = mock_service(
self.hass, light.DOMAIN, components.SERVICE_TURN_ON)
light.turn_on(
self.hass,
entity_id='entity_id_val',
transition='transition_val',
brightness='brightness_val',
rgb_color='rgb_color_val',
xy_color='xy_color_val',
profile='profile_val')
self.hass._pool.block_till_done()
self.assertEqual(1, len(turn_on_calls))
call = turn_on_calls[-1]
self.assertEqual(light.DOMAIN, call.domain)
self.assertEqual(components.SERVICE_TURN_ON, call.service)
self.assertEqual('entity_id_val', call.data[components.ATTR_ENTITY_ID])
self.assertEqual('transition_val', call.data[light.ATTR_TRANSITION])
self.assertEqual('brightness_val', call.data[light.ATTR_BRIGHTNESS])
self.assertEqual('rgb_color_val', call.data[light.ATTR_RGB_COLOR])
self.assertEqual('xy_color_val', call.data[light.ATTR_XY_COLOR])
self.assertEqual('profile_val', call.data[light.ATTR_PROFILE])
# Test turn_off
turn_off_calls = mock_service(
self.hass, light.DOMAIN, components.SERVICE_TURN_OFF)
light.turn_off(
self.hass, entity_id='entity_id_val', transition='transition_val')
self.hass._pool.block_till_done()
self.assertEqual(1, len(turn_off_calls))
call = turn_off_calls[-1]
self.assertEqual(light.DOMAIN, call.domain)
self.assertEqual(components.SERVICE_TURN_OFF, call.service)
self.assertEqual('entity_id_val', call.data[components.ATTR_ENTITY_ID])
self.assertEqual('transition_val', call.data[light.ATTR_TRANSITION])
def test_services(self):
""" Test the provided services. """
mock_toggledevice_platform.init()
self.assertTrue(
light.setup(self.hass, {light.DOMAIN: {ha.CONF_TYPE: 'test'}}))
dev1, dev2, dev3 = mock_toggledevice_platform.get_lights(None, None)
# Test init
self.assertTrue(light.is_on(self.hass, dev1.entity_id))
self.assertFalse(light.is_on(self.hass, dev2.entity_id))
self.assertFalse(light.is_on(self.hass, dev3.entity_id))
# Test basic turn_on, turn_off services
light.turn_off(self.hass, entity_id=dev1.entity_id)
light.turn_on(self.hass, entity_id=dev2.entity_id)
self.hass._pool.block_till_done()
self.assertFalse(light.is_on(self.hass, dev1.entity_id))
self.assertTrue(light.is_on(self.hass, dev2.entity_id))
# turn on all lights
light.turn_on(self.hass)
self.hass._pool.block_till_done()
self.assertTrue(light.is_on(self.hass, dev1.entity_id))
self.assertTrue(light.is_on(self.hass, dev2.entity_id))
self.assertTrue(light.is_on(self.hass, dev3.entity_id))
# turn off all lights
light.turn_off(self.hass)
self.hass._pool.block_till_done()
self.assertFalse(light.is_on(self.hass, dev1.entity_id))
self.assertFalse(light.is_on(self.hass, dev2.entity_id))
self.assertFalse(light.is_on(self.hass, dev3.entity_id))
# Ensure all attributes process correctly
light.turn_on(self.hass, dev1.entity_id,
transition=10, brightness=20)
light.turn_on(
self.hass, dev2.entity_id, rgb_color=[255, 255, 255])
light.turn_on(self.hass, dev3.entity_id, xy_color=[.4, .6])
self.hass._pool.block_till_done()
method, data = dev1.last_call('turn_on')
self.assertEqual(
{light.ATTR_TRANSITION: 10,
light.ATTR_BRIGHTNESS: 20},
data)
method, data = dev2.last_call('turn_on')
self.assertEqual(
{light.ATTR_XY_COLOR: util.color_RGB_to_xy(255, 255, 255)},
data)
method, data = dev3.last_call('turn_on')
self.assertEqual({light.ATTR_XY_COLOR: [.4, .6]}, data)
# One of the light profiles
prof_name, prof_x, prof_y, prof_bri = 'relax', 0.5119, 0.4147, 144
# Test light profiles
light.turn_on(self.hass, dev1.entity_id, profile=prof_name)
# Specify a profile and attributes to overwrite it
light.turn_on(
self.hass, dev2.entity_id,
profile=prof_name, brightness=100, xy_color=[.4, .6])
self.hass._pool.block_till_done()
method, data = dev1.last_call('turn_on')
self.assertEqual(
{light.ATTR_BRIGHTNESS: prof_bri,
light.ATTR_XY_COLOR: [prof_x, prof_y]},
data)
method, data = dev2.last_call('turn_on')
self.assertEqual(
{light.ATTR_BRIGHTNESS: 100,
light.ATTR_XY_COLOR: [.4, .6]},
data)
# Test shitty data
light.turn_on(self.hass, dev1.entity_id, profile="nonexisting")
light.turn_on(self.hass, dev2.entity_id, xy_color=["bla-di-bla", 5])
light.turn_on(self.hass, dev3.entity_id, rgb_color=[255, None, 2])
self.hass._pool.block_till_done()
method, data = dev1.last_call('turn_on')
self.assertEqual({}, data)
method, data = dev2.last_call('turn_on')
self.assertEqual({}, data)
method, data = dev3.last_call('turn_on')
self.assertEqual({}, data)
# faulty attributes should not overwrite profile data
light.turn_on(
self.hass, dev1.entity_id,
profile=prof_name, brightness='bright', rgb_color='yellowish')
self.hass._pool.block_till_done()
method, data = dev1.last_call('turn_on')
self.assertEqual(
{light.ATTR_BRIGHTNESS: prof_bri,
light.ATTR_XY_COLOR: [prof_x, prof_y]},
data)
def test_setup(self):
""" Test the setup method. """
# Bogus config
self.assertFalse(light.setup(self.hass, {}))
self.assertFalse(light.setup(self.hass, {light.DOMAIN: {}}))
# Test with non-existing component
self.assertFalse(light.setup(
self.hass, {light.DOMAIN: {ha.CONF_TYPE: 'nonexisting'}}
))
# Test if light component returns 0 lightes
mock_toggledevice_platform.init(True)
self.assertEqual(
[], mock_toggledevice_platform.get_lights(None, None))
self.assertFalse(light.setup(
self.hass, {light.DOMAIN: {ha.CONF_TYPE: 'test'}}
))
def test_light_profiles(self):
""" Test light profiles. """
mock_toggledevice_platform.init()
user_light_file = self.hass.get_config_path(light.LIGHT_PROFILES_FILE)
# Setup a wrong light file
with open(user_light_file, 'w') as user_file:
user_file.write('id,x,y,brightness\n')
user_file.write('I,WILL,NOT,WORK\n')
self.assertFalse(light.setup(
self.hass, {light.DOMAIN: {ha.CONF_TYPE: 'test'}}
))
# Clean up broken file
os.remove(user_light_file)
with open(user_light_file, 'w') as user_file:
user_file.write('id,x,y,brightness\n')
user_file.write('test,.4,.6,100\n')
self.assertTrue(light.setup(
self.hass, {light.DOMAIN: {ha.CONF_TYPE: 'test'}}
))
dev1, dev2, dev3 = mock_toggledevice_platform.get_lights(None, None)
light.turn_on(self.hass, dev1.entity_id, profile='test')
self.hass._pool.block_till_done()
method, data = dev1.last_call('turn_on')
self.assertEqual(
{light.ATTR_XY_COLOR: [.4, .6], light.ATTR_BRIGHTNESS: 100},
data)

124
test/test_component_sun.py Normal file
View file

@ -0,0 +1,124 @@
"""
test.test_component_sun
~~~~~~~~~~~~~~~~~~~~~~~
Tests Sun component.
"""
# pylint: disable=too-many-public-methods,protected-access
import unittest
import datetime as dt
import ephem
import homeassistant as ha
import homeassistant.components.sun as sun
class TestSun(unittest.TestCase):
""" Test the sun module. """
def setUp(self): # pylint: disable=invalid-name
self.hass = ha.HomeAssistant()
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass._pool.stop()
def test_is_on(self):
""" Test is_on method. """
self.hass.states.set(sun.ENTITY_ID, sun.STATE_ABOVE_HORIZON)
self.assertTrue(sun.is_on(self.hass))
self.hass.states.set(sun.ENTITY_ID, sun.STATE_BELOW_HORIZON)
self.assertFalse(sun.is_on(self.hass))
def test_setting_rising(self):
""" Test retrieving sun setting and rising. """
# Compare it with the real data
self.assertTrue(sun.setup(
self.hass,
{ha.DOMAIN: {
ha.CONF_LATITUDE: '32.87336',
ha.CONF_LONGITUDE: '117.22743'
}}))
observer = ephem.Observer()
observer.lat = '32.87336' # pylint: disable=assigning-non-slot
observer.long = '117.22743' # pylint: disable=assigning-non-slot
utc_now = dt.datetime.utcnow()
body_sun = ephem.Sun() # pylint: disable=no-member
next_rising_dt = ephem.localtime(
observer.next_rising(body_sun, start=utc_now))
next_setting_dt = ephem.localtime(
observer.next_setting(body_sun, start=utc_now))
# Home Assistant strips out microseconds
# strip it out of the datetime objects
next_rising_dt = next_rising_dt - dt.timedelta(
microseconds=next_rising_dt.microsecond)
next_setting_dt = next_setting_dt - dt.timedelta(
microseconds=next_setting_dt.microsecond)
self.assertEqual(next_rising_dt, sun.next_rising(self.hass))
self.assertEqual(next_setting_dt, sun.next_setting(self.hass))
# Point it at a state without the proper attributes
self.hass.states.set(sun.ENTITY_ID, sun.STATE_ABOVE_HORIZON)
self.assertIsNone(sun.next_rising(self.hass))
self.assertIsNone(sun.next_setting(self.hass))
# Point it at a non-existing state
self.assertIsNone(sun.next_rising(self.hass, 'non.existing'))
self.assertIsNone(sun.next_setting(self.hass, 'non.existing'))
def test_state_change(self):
""" Test if the state changes at next setting/rising. """
self.assertTrue(sun.setup(
self.hass,
{ha.DOMAIN: {
ha.CONF_LATITUDE: '32.87336',
ha.CONF_LONGITUDE: '117.22743'
}}))
if sun.is_on(self.hass):
test_state = sun.STATE_BELOW_HORIZON
test_time = sun.next_setting(self.hass)
else:
test_state = sun.STATE_ABOVE_HORIZON
test_time = sun.next_rising(self.hass)
self.assertIsNotNone(test_time)
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: test_time + dt.timedelta(seconds=5)})
self.hass._pool.block_till_done()
self.assertEqual(test_state, self.hass.states.get(sun.ENTITY_ID).state)
def test_setup(self):
""" Test Sun setup with empty and wrong configs. """
self.assertFalse(sun.setup(self.hass, {}))
self.assertFalse(sun.setup(self.hass, {sun.DOMAIN: {}}))
self.assertFalse(sun.setup(
self.hass, {ha.DOMAIN: {ha.CONF_LATITUDE: '32.87336'}}))
self.assertFalse(sun.setup(
self.hass, {ha.DOMAIN: {ha.CONF_LONGITUDE: '117.22743'}}))
self.assertFalse(sun.setup(
self.hass, {ha.DOMAIN: {ha.CONF_LATITUDE: 'hello'}}))
self.assertFalse(sun.setup(
self.hass, {ha.DOMAIN: {ha.CONF_LONGITUDE: 'how are you'}}))
self.assertFalse(sun.setup(
self.hass, {ha.DOMAIN: {
ha.CONF_LATITUDE: 'wrong', ha.CONF_LONGITUDE: '117.22743'
}}))
self.assertFalse(sun.setup(
self.hass, {ha.DOMAIN: {
ha.CONF_LATITUDE: '32.87336', ha.CONF_LONGITUDE: 'wrong'
}}))
# Test with correct config
self.assertTrue(sun.setup(
self.hass, {ha.DOMAIN: {
ha.CONF_LATITUDE: '32.87336', ha.CONF_LONGITUDE: '117.22743'
}}))

View file

@ -0,0 +1,103 @@
"""
test.test_component_switch
~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests switch component.
"""
# pylint: disable=too-many-public-methods,protected-access
import unittest
import homeassistant as ha
import homeassistant.loader as loader
import homeassistant.components as components
import homeassistant.components.switch as switch
import mock_toggledevice_platform
class TestSwitch(unittest.TestCase):
""" Test the switch module. """
def setUp(self): # pylint: disable=invalid-name
self.hass = ha.HomeAssistant()
loader.prepare(self.hass)
loader.set_component('switch.test', mock_toggledevice_platform)
mock_toggledevice_platform.init()
self.assertTrue(switch.setup(
self.hass, {switch.DOMAIN: {ha.CONF_TYPE: 'test'}}
))
# Switch 1 is ON, switch 2 is OFF
self.switch_1, self.switch_2, self.switch_3 = \
mock_toggledevice_platform.get_switches(None, None)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass._pool.stop()
def test_methods(self):
""" Test is_on, turn_on, turn_off methods. """
self.assertTrue(switch.is_on(self.hass))
self.assertEqual(
components.STATE_ON,
self.hass.states.get(switch.ENTITY_ID_ALL_SWITCHES).state)
self.assertTrue(switch.is_on(self.hass, self.switch_1.entity_id))
self.assertFalse(switch.is_on(self.hass, self.switch_2.entity_id))
self.assertFalse(switch.is_on(self.hass, self.switch_3.entity_id))
switch.turn_off(self.hass, self.switch_1.entity_id)
switch.turn_on(self.hass, self.switch_2.entity_id)
self.hass._pool.block_till_done()
self.assertTrue(switch.is_on(self.hass))
self.assertFalse(switch.is_on(self.hass, self.switch_1.entity_id))
self.assertTrue(switch.is_on(self.hass, self.switch_2.entity_id))
# Turn all off
switch.turn_off(self.hass)
self.hass._pool.block_till_done()
self.assertFalse(switch.is_on(self.hass))
self.assertEqual(
components.STATE_OFF,
self.hass.states.get(switch.ENTITY_ID_ALL_SWITCHES).state)
self.assertFalse(switch.is_on(self.hass, self.switch_1.entity_id))
self.assertFalse(switch.is_on(self.hass, self.switch_2.entity_id))
self.assertFalse(switch.is_on(self.hass, self.switch_3.entity_id))
# Turn all on
switch.turn_on(self.hass)
self.hass._pool.block_till_done()
self.assertTrue(switch.is_on(self.hass))
self.assertEqual(
components.STATE_ON,
self.hass.states.get(switch.ENTITY_ID_ALL_SWITCHES).state)
self.assertTrue(switch.is_on(self.hass, self.switch_1.entity_id))
self.assertTrue(switch.is_on(self.hass, self.switch_2.entity_id))
self.assertTrue(switch.is_on(self.hass, self.switch_3.entity_id))
def test_setup(self):
# Bogus config
self.assertFalse(switch.setup(self.hass, {}))
self.assertFalse(switch.setup(self.hass, {switch.DOMAIN: {}}))
# Test with non-existing component
self.assertFalse(switch.setup(
self.hass, {switch.DOMAIN: {ha.CONF_TYPE: 'nonexisting'}}
))
# Test if switch component returns 0 switches
mock_toggledevice_platform.init(True)
self.assertEqual(
[], mock_toggledevice_platform.get_switches(None, None))
self.assertFalse(switch.setup(
self.hass, {switch.DOMAIN: {ha.CONF_TYPE: 'test'}}
))

319
test/test_core.py Normal file
View file

@ -0,0 +1,319 @@
"""
test.test_core
~~~~~~~~~~~~~~
Provides tests to verify that Home Assistant core works.
"""
# pylint: disable=protected-access,too-many-public-methods
# pylint: disable=too-few-public-methods
import os
import unittest
import time
import threading
from datetime import datetime
import homeassistant as ha
class TestHomeAssistant(unittest.TestCase):
"""
Tests the Home Assistant core classes.
Currently only includes tests to test cases that do not
get tested in the API integration tests.
"""
def setUp(self): # pylint: disable=invalid-name
""" things to be run when tests are started. """
self.hass = ha.HomeAssistant()
self.hass.states.set("light.Bowl", "on")
self.hass.states.set("switch.AC", "off")
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass._pool.stop()
def test_get_config_path(self):
""" Test get_config_path method. """
self.assertEqual(os.path.join(os.getcwd(), "config"),
self.hass.config_dir)
self.assertEqual(os.path.join(os.getcwd(), "config", "test.conf"),
self.hass.get_config_path("test.conf"))
def test_block_till_stoped(self):
""" Test if we can block till stop service is called. """
blocking_thread = threading.Thread(target=self.hass.block_till_stopped)
self.assertFalse(blocking_thread.is_alive())
blocking_thread.start()
# Python will now give attention to the other thread
time.sleep(1)
self.assertTrue(blocking_thread.is_alive())
self.hass.call_service(ha.DOMAIN, ha.SERVICE_HOMEASSISTANT_STOP)
self.hass._pool.block_till_done()
# hass.block_till_stopped checks every second if it should quit
# we have to wait worst case 1 second
wait_loops = 0
while blocking_thread.is_alive() and wait_loops < 10:
wait_loops += 1
time.sleep(0.1)
self.assertFalse(blocking_thread.is_alive())
def test_get_entity_ids(self):
""" Test get_entity_ids method. """
ent_ids = self.hass.get_entity_ids()
self.assertEqual(2, len(ent_ids))
self.assertTrue('light.Bowl' in ent_ids)
self.assertTrue('switch.AC' in ent_ids)
ent_ids = self.hass.get_entity_ids('light')
self.assertEqual(1, len(ent_ids))
self.assertTrue('light.Bowl' in ent_ids)
def test_track_state_change(self):
""" Test track_state_change. """
# 2 lists to track how often our callbacks got called
specific_runs = []
wildcard_runs = []
self.hass.track_state_change(
'light.Bowl', lambda a, b, c: specific_runs.append(1), 'on', 'off')
self.hass.track_state_change(
'light.Bowl', lambda a, b, c: wildcard_runs.append(1),
ha.MATCH_ALL, ha.MATCH_ALL)
# Set same state should not trigger a state change/listener
self.hass.states.set('light.Bowl', 'on')
self.hass._pool.block_till_done()
self.assertEqual(0, len(specific_runs))
self.assertEqual(0, len(wildcard_runs))
# State change off -> on
self.hass.states.set('light.Bowl', 'off')
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(1, len(wildcard_runs))
# State change off -> off
self.hass.states.set('light.Bowl', 'off', {"some_attr": 1})
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(2, len(wildcard_runs))
# State change off -> on
self.hass.states.set('light.Bowl', 'on')
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
def test_listen_once_event(self):
""" Test listen_once_event method. """
runs = []
self.hass.listen_once_event('test_event', lambda x: runs.append(1))
self.hass.bus.fire('test_event')
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
# Second time it should not increase runs
self.hass.bus.fire('test_event')
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
def test_track_point_in_time(self):
""" Test track point in time. """
before_birthday = datetime(1985, 7, 9, 12, 0, 0)
birthday_paulus = datetime(1986, 7, 9, 12, 0, 0)
after_birthday = datetime(1987, 7, 9, 12, 0, 0)
runs = []
self.hass.track_point_in_time(
lambda x: runs.append(1), birthday_paulus)
self._send_time_changed(before_birthday)
self.hass._pool.block_till_done()
self.assertEqual(0, len(runs))
self._send_time_changed(birthday_paulus)
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
# A point in time tracker will only fire once, this should do nothing
self._send_time_changed(birthday_paulus)
self.hass._pool.block_till_done()
self.assertEqual(1, len(runs))
self.hass.track_point_in_time(
lambda x: runs.append(1), birthday_paulus)
self._send_time_changed(after_birthday)
self.hass._pool.block_till_done()
self.assertEqual(2, len(runs))
def test_track_time_change(self):
""" Test tracking time change. """
wildcard_runs = []
specific_runs = []
self.hass.track_time_change(lambda x: wildcard_runs.append(1))
self.hass.track_time_change(
lambda x: specific_runs.append(1), second=[0, 30])
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 0))
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(1, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 15))
self.hass._pool.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(2, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30))
self.hass._pool.block_till_done()
self.assertEqual(2, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
def _send_time_changed(self, now):
""" Send a time changed event. """
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})
class TestEvent(unittest.TestCase):
""" Test Event class. """
def test_repr(self):
""" Test that repr method works. #MoreCoverage """
self.assertEqual(
"<Event TestEvent[L]>",
str(ha.Event("TestEvent")))
self.assertEqual(
"<Event TestEvent[R]: beer=nice>",
str(ha.Event("TestEvent",
{"beer": "nice"},
ha.EventOrigin.remote)))
class TestEventBus(unittest.TestCase):
""" Test EventBus methods. """
def setUp(self): # pylint: disable=invalid-name
""" things to be run when tests are started. """
self.bus = ha.EventBus()
self.bus.listen('test_event', lambda x: len)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.bus._pool.stop()
def test_add_remove_listener(self):
""" Test remove_listener method. """
old_count = len(self.bus.listeners)
listener = lambda x: len
self.bus.listen('test', listener)
self.assertEqual(old_count + 1, len(self.bus.listeners))
# Try deleting a non registered listener, nothing should happen
self.bus.remove_listener('test', lambda x: len)
# Remove listener
self.bus.remove_listener('test', listener)
self.assertEqual(old_count, len(self.bus.listeners))
# Try deleting listener while category doesn't exist either
self.bus.remove_listener('test', listener)
class TestState(unittest.TestCase):
""" Test EventBus methods. """
def test_init(self):
""" Test state.init """
self.assertRaises(
ha.InvalidEntityFormatError, ha.State,
'invalid_entity_format', 'test_state')
def test_repr(self):
""" Test state.repr """
self.assertEqual("<state on @ 12:00:00 08-12-1984>",
str(ha.State(
"happy.happy", "on",
last_changed=datetime(1984, 12, 8, 12, 0, 0))))
self.assertEqual("<state on:brightness=144 @ 12:00:00 08-12-1984>",
str(ha.State("happy.happy", "on", {"brightness": 144},
datetime(1984, 12, 8, 12, 0, 0))))
class TestStateMachine(unittest.TestCase):
""" Test EventBus methods. """
def setUp(self): # pylint: disable=invalid-name
""" things to be run when tests are started. """
self.bus = ha.EventBus()
self.states = ha.StateMachine(self.bus)
self.states.set("light.Bowl", "on")
self.states.set("switch.AC", "off")
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.bus._pool.stop()
def test_is_state(self):
""" Test is_state method. """
self.assertTrue(self.states.is_state('light.Bowl', 'on'))
self.assertFalse(self.states.is_state('light.Bowl', 'off'))
self.assertFalse(self.states.is_state('light.Non_existing', 'on'))
def test_remove(self):
""" Test remove method. """
self.assertTrue('light.Bowl' in self.states.entity_ids)
self.assertTrue(self.states.remove('light.Bowl'))
self.assertFalse('light.Bowl' in self.states.entity_ids)
# If it does not exist, we should get False
self.assertFalse(self.states.remove('light.Bowl'))
class TestServiceCall(unittest.TestCase):
""" Test ServiceCall class. """
def test_repr(self):
""" Test repr method. """
self.assertEqual(
"<ServiceCall homeassistant.start>",
str(ha.ServiceCall('homeassistant', 'start')))
self.assertEqual(
"<ServiceCall homeassistant.start: fast=yes>",
str(ha.ServiceCall('homeassistant', 'start', {"fast": "yes"})))
class TestServiceRegistry(unittest.TestCase):
""" Test EventBus methods. """
def setUp(self): # pylint: disable=invalid-name
""" things to be run when tests are started. """
self.pool = ha.create_worker_pool()
self.bus = ha.EventBus(self.pool)
self.services = ha.ServiceRegistry(self.bus, self.pool)
self.services.register("test_domain", "test_service", lambda x: len)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.pool.stop()
def test_has_service(self):
""" Test has_service method. """
self.assertTrue(
self.services.has_service("test_domain", "test_service"))

39
test/test_loader.py Normal file
View file

@ -0,0 +1,39 @@
"""
test.test_loader
~~~~~~~~~~~~~~~~~~
Provides tests to verify that we can load components.
"""
# pylint: disable=too-many-public-methods,protected-access
import unittest
import homeassistant as ha
import homeassistant.loader as loader
import homeassistant.components.http as http
import mock_toggledevice_platform
from helper import get_test_home_assistant
class TestLoader(unittest.TestCase):
""" Test the loader module. """
def setUp(self): # pylint: disable=invalid-name
self.hass = get_test_home_assistant()
loader.prepare(self.hass)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass._pool.stop()
def test_set_component(self):
""" Test if set_component works. """
loader.set_component('switch.test', mock_toggledevice_platform)
self.assertEqual(
mock_toggledevice_platform, loader.get_component('switch.test'))
def test_get_component(self):
""" Test if get_component works. """
self.assertEqual(http, loader.get_component('http'))
self.assertIsNotNone(loader.get_component('custom_one'))

201
test/test_remote.py Normal file
View file

@ -0,0 +1,201 @@
"""
test.remote
~~~~~~~~~~~
Tests Home Assistant remote methods and classes.
"""
# pylint: disable=protected-access,too-many-public-methods
import unittest
import homeassistant as ha
import homeassistant.remote as remote
import homeassistant.components.http as http
API_PASSWORD = "test1234"
HTTP_BASE_URL = "http://127.0.0.1:{}".format(remote.SERVER_PORT)
HA_HEADERS = {remote.AUTH_HEADER: API_PASSWORD}
hass, slave, master_api = None, None, None
def _url(path=""):
""" Helper method to generate urls. """
return HTTP_BASE_URL + path
def setUpModule(): # pylint: disable=invalid-name
""" Initalizes a Home Assistant server and Slave instance. """
global hass, slave, master_api
hass = ha.HomeAssistant()
hass.bus.listen('test_event', lambda _: _)
hass.states.set('test.test', 'a_state')
http.setup(hass,
{http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD}})
hass.start()
master_api = remote.API("127.0.0.1", API_PASSWORD)
# Start slave
local_api = remote.API("127.0.0.1", API_PASSWORD, 8124)
slave = remote.HomeAssistant(master_api, local_api)
http.setup(slave,
{http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD,
http.CONF_SERVER_PORT: 8124}})
slave.start()
def tearDownModule(): # pylint: disable=invalid-name
""" Stops the Home Assistant server and slave. """
global hass, slave
hass.stop()
slave.stop()
class TestRemoteMethods(unittest.TestCase):
""" Test the homeassistant.remote module. """
def test_validate_api(self):
""" Test Python API validate_api. """
self.assertEqual(remote.APIStatus.OK, remote.validate_api(master_api))
self.assertEqual(remote.APIStatus.INVALID_PASSWORD,
remote.validate_api(
remote.API("127.0.0.1", API_PASSWORD + "A")))
def test_get_event_listeners(self):
""" Test Python API get_event_listeners. """
local_data = hass.bus.listeners
remote_data = remote.get_event_listeners(master_api)
for event in remote_data:
self.assertEqual(local_data.pop(event["event"]),
event["listener_count"])
self.assertEqual(len(local_data), 0)
def test_fire_event(self):
""" Test Python API fire_event. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
hass.listen_once_event("test.event_no_data", listener)
remote.fire_event(master_api, "test.event_no_data")
hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
def test_get_state(self):
""" Test Python API get_state. """
self.assertEqual(
hass.states.get('test.test'),
remote.get_state(master_api, 'test.test'))
def test_get_states(self):
""" Test Python API get_state_entity_ids. """
self.assertEqual(
remote.get_states(master_api), hass.states.all())
def test_set_state(self):
""" Test Python API set_state. """
self.assertTrue(remote.set_state(master_api, 'test.test', 'set_test'))
self.assertEqual('set_test', hass.states.get('test.test').state)
def test_is_state(self):
""" Test Python API is_state. """
self.assertTrue(
remote.is_state(master_api, 'test.test',
hass.states.get('test.test').state))
def test_get_services(self):
""" Test Python API get_services. """
local_services = hass.services.services
for serv_domain in remote.get_services(master_api):
local = local_services.pop(serv_domain["domain"])
self.assertEqual(local, serv_domain["services"])
def test_call_service(self):
""" Test Python API call_service. """
test_value = []
def listener(service_call): # pylint: disable=unused-argument
""" Helper method that will verify that our service got called. """
test_value.append(1)
hass.services.register("test_domain", "test_service", listener)
remote.call_service(master_api, "test_domain", "test_service")
hass._pool.block_till_done()
self.assertEqual(1, len(test_value))
class TestRemoteClasses(unittest.TestCase):
""" Test the homeassistant.remote module. """
def test_home_assistant_init(self):
""" Test HomeAssistant init. """
self.assertRaises(
ha.HomeAssistantError, remote.HomeAssistant,
remote.API('127.0.0.1', API_PASSWORD + 'A', 8124))
def test_statemachine_init(self):
""" Tests if remote.StateMachine copies all states on init. """
self.assertEqual(len(hass.states.all()),
len(slave.states.all()))
for state in hass.states.all():
self.assertEqual(
state, slave.states.get(state.entity_id))
def test_statemachine_set(self):
""" Tests if setting the state on a slave is recorded. """
slave.states.set("remote.test", "remote.statemachine test")
# Wait till slave tells master
slave._pool.block_till_done()
# Wait till master gives updated state
hass._pool.block_till_done()
self.assertEqual("remote.statemachine test",
slave.states.get("remote.test").state)
def test_eventbus_fire(self):
""" Test if events fired from the eventbus get fired. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify our event got called. """
test_value.append(1)
slave.listen_once_event("test.event_no_data", listener)
slave.bus.fire("test.event_no_data")
# Wait till slave tells master
slave._pool.block_till_done()
# Wait till master gives updated event
hass._pool.block_till_done()
self.assertEqual(1, len(test_value))

89
test/test_util.py Normal file
View file

@ -0,0 +1,89 @@
"""
test.test_util
~~~~~~~~~~~~~~
Tests Home Assistant util methods.
"""
# pylint: disable=too-many-public-methods
import unittest
from datetime import datetime
import homeassistant.util as util
class TestUtil(unittest.TestCase):
""" Tests util methods. """
def test_sanitize_filename(self):
""" Test sanitize_filename. """
self.assertEqual("test", util.sanitize_filename("test"))
self.assertEqual("test", util.sanitize_filename("/test"))
self.assertEqual("test", util.sanitize_filename("..test"))
self.assertEqual("test", util.sanitize_filename("\\test"))
self.assertEqual("test", util.sanitize_filename("\\../test"))
def test_sanitize_path(self):
""" Test sanitize_path. """
self.assertEqual("test/path", util.sanitize_path("test/path"))
self.assertEqual("test/path", util.sanitize_path("~test/path"))
self.assertEqual("//test/path",
util.sanitize_path("~/../test/path"))
def test_slugify(self):
""" Test slugify. """
self.assertEqual("Test", util.slugify("T-!@#$!#@$!$est"))
self.assertEqual("Test_More", util.slugify("Test More"))
self.assertEqual("Test_More", util.slugify("Test_(More)"))
def test_datetime_to_str(self):
""" Test datetime_to_str. """
self.assertEqual("12:00:00 09-07-1986",
util.datetime_to_str(datetime(1986, 7, 9, 12, 0, 0)))
def test_str_to_datetime(self):
""" Test str_to_datetime. """
self.assertEqual(datetime(1986, 7, 9, 12, 0, 0),
util.str_to_datetime("12:00:00 09-07-1986"))
def test_split_entity_id(self):
""" Test split_entity_id. """
self.assertEqual(['domain', 'object_id'],
util.split_entity_id('domain.object_id'))
def test_repr_helper(self):
""" Test repr_helper. """
self.assertEqual("A", util.repr_helper("A"))
self.assertEqual("5", util.repr_helper(5))
self.assertEqual("True", util.repr_helper(True))
self.assertEqual("test=1",
util.repr_helper({"test": 1}))
self.assertEqual("12:00:00 09-07-1986",
util.repr_helper(datetime(1986, 7, 9, 12, 0, 0)))
# pylint: disable=invalid-name
def test_color_RGB_to_xy(self):
""" Test color_RGB_to_xy. """
self.assertEqual((0, 0), util.color_RGB_to_xy(0, 0, 0))
self.assertEqual((0.3127159072215825, 0.3290014805066623),
util.color_RGB_to_xy(255, 255, 255))
self.assertEqual((0.15001662234042554, 0.060006648936170214),
util.color_RGB_to_xy(0, 0, 255))
self.assertEqual((0.3, 0.6), util.color_RGB_to_xy(0, 255, 0))
self.assertEqual((0.6400744994567747, 0.3299705106316933),
util.color_RGB_to_xy(255, 0, 0))
def test_convert(self):
""" Test convert. """
self.assertEqual(5, util.convert("5", int))
self.assertEqual(5.0, util.convert("5", float))
self.assertEqual(True, util.convert("True", bool))
self.assertEqual(1, util.convert("NOT A NUMBER", int, 1))
self.assertEqual(1, util.convert(None, int, 1))
def test_ensure_unique_string(self):
""" Test ensure_unique_string. """
self.assertEqual(
"Beer_3",
util.ensure_unique_string("Beer", ["Beer", "Beer_2"]))