Light color/brightness now exposed and controllable

This commit is contained in:
Paulus Schoutsen 2014-03-16 15:00:59 -07:00
parent 8154e75069
commit bb771d802d
3 changed files with 285 additions and 86 deletions

View file

@ -15,6 +15,8 @@ from . import light, sun, device_tracker, group
LIGHT_TRANSITION_TIME = timedelta(minutes=15)
LIGHT_BRIGHTNESS = 164
LIGHT_XY_COLOR = [0.5119, 0.4147]
# pylint: disable=too-many-branches
@ -64,7 +66,10 @@ def setup(bus, statemachine, light_group=None):
if (device_tracker.is_on(statemachine) and
not light.is_on(statemachine, light_id)):
light.turn_on(bus, light_id, LIGHT_TRANSITION_TIME.seconds)
light.turn_on(bus, light_id,
transition=LIGHT_TRANSITION_TIME.seconds,
brightness=LIGHT_BRIGHTNESS,
xy_color=LIGHT_XY_COLOR)
def turn_on(light_id):
""" Lambda can keep track of function parameters but not local
@ -115,7 +120,9 @@ def setup(bus, statemachine, light_group=None):
# Turn on lights directly instead of calling group.turn_on
# So we skip fetching the entity ids again.
for light_id in light_ids:
light.turn_on(bus, light_id)
light.turn_on(bus, light_id,
brightness=LIGHT_BRIGHTNESS,
xy_color=LIGHT_XY_COLOR)
# Are we in the time span were we would turn on the lights
# if someone would be home?

View file

@ -8,6 +8,7 @@ Provides functionality to interact with lights.
import logging
import socket
from datetime import datetime, timedelta
from collections import namedtuple
import homeassistant as ha
import homeassistant.util as util
@ -26,6 +27,16 @@ ENTITY_ID_FORMAT = DOMAIN + ".{}"
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
# integer that represents transition time in seconds to make change
ATTR_TRANSITION = "transition"
# lists holding color values
ATTR_RGB_COLOR = "rgb_color"
ATTR_XY_COLOR = "xy_color"
# int with value 0 .. 255 representing brightness of the light
ATTR_BRIGHTNESS = "brightness"
def is_on(statemachine, entity_id=None):
""" Returns if the lights are on based on the statemachine. """
@ -34,32 +45,44 @@ def is_on(statemachine, entity_id=None):
return statemachine.is_state(entity_id, STATE_ON)
def turn_on(bus, entity_id=None, transition_seconds=None):
# pylint: disable=too-many-arguments
def turn_on(bus, entity_id=None, transition=None, brightness=None,
rgb_color=None, xy_color=None):
""" Turns all or specified light on. """
data = {}
if entity_id:
data[ATTR_ENTITY_ID] = entity_id
if transition_seconds:
data["transition_seconds"] = transition_seconds
if transition is not None:
data[ATTR_TRANSITION] = transition
if brightness is not None:
data[ATTR_BRIGHTNESS] = brightness
if rgb_color is not None:
data[ATTR_RGB_COLOR] = rgb_color
if xy_color is not None:
data[ATTR_XY_COLOR] = xy_color
bus.call_service(DOMAIN, SERVICE_TURN_ON, data)
def turn_off(bus, entity_id=None, transition_seconds=None):
def turn_off(bus, entity_id=None, transition=None):
""" Turns all or specified light off. """
data = {}
if entity_id:
data[ATTR_ENTITY_ID] = entity_id
if transition_seconds:
data["transition_seconds"] = transition_seconds
if transition is not None:
data[ATTR_TRANSITION] = transition
bus.call_service(DOMAIN, SERVICE_TURN_OFF, data)
# pylint: disable=too-many-branches
def setup(bus, statemachine, light_control):
""" Exposes light control via statemachine and services. """
@ -68,47 +91,68 @@ def setup(bus, statemachine, light_control):
ent_to_light = {}
light_to_ent = {}
def update_light_state(time): # pylint: disable=unused-argument
""" Track the state of the lights. """
try:
should_update = datetime.now() - update_light_state.last_updated \
> MIN_TIME_BETWEEN_SCANS
except AttributeError: # if last_updated does not exist
should_update = True
if should_update:
logger.info("Updating light status")
update_light_state.last_updated = datetime.now()
names = None
states = light_control.get_states()
for light_id, is_light_on in states.items():
def _update_light_state(light_id, light_state):
""" Update statemachine based on the LightState passed in. """
try:
entity_id = light_to_ent[light_id]
except KeyError:
# We have not seen this light before, set it up
# Load light names if not loaded this update call
if names is None:
names = light_control.get_names()
# Get name and create entity id
name = light_control.get_name(light_id) or "Unknown Light"
name = names.get(
light_id, "Unknown Light {}".format(len(ent_to_light)))
logger.info("Found new light {}".format(name))
logger.info(u"Found new light {}".format(name))
entity_id = ENTITY_ID_FORMAT.format(util.slugify(name))
# Ensure unique entity id
tries = 1
while entity_id in ent_to_light:
tries += 1
entity_id = ENTITY_ID_FORMAT.format(
util.slugify("{} {}".format(name, tries)))
ent_to_light[entity_id] = light_id
light_to_ent[light_id] = entity_id
statemachine.set_state(entity_id,
STATE_ON if is_light_on else STATE_OFF)
state_attr = {}
if light_state.on:
state = STATE_ON
if light_state.brightness:
state_attr[ATTR_BRIGHTNESS] = light_state.brightness
if light_state.color:
state_attr[ATTR_XY_COLOR] = light_state.color
else:
state = STATE_OFF
statemachine.set_state(entity_id, state, state_attr)
def update_light_state(light_id):
""" Update the state of specified light. """
_update_light_state(light_id, light_control.get_state(light_id))
# pylint: disable=unused-argument
def update_lights_state(time, force_reload=False):
""" Update the state of all the lights. """
# First time this method gets called, force_reload should be True
if (force_reload or
datetime.now() - update_lights_state.last_updated >
MIN_TIME_BETWEEN_SCANS):
logger.info("Updating light status")
update_lights_state.last_updated = datetime.now()
for light_id, light_state in light_control.get_states().items():
_update_light_state(light_id, light_state)
# Update light state and discover lights for tracking the group
update_light_state(None)
update_lights_state(None, True)
# Track all lights in a group
group.setup(bus, statemachine,
@ -116,20 +160,65 @@ def setup(bus, statemachine, light_control):
def handle_light_service(service):
""" Hande a turn light on or off service call. """
entity_id = service.data.get(ATTR_ENTITY_ID, None)
transition_seconds = service.data.get("transition_seconds", None)
# Get and validate data
dat = service.data
if service.service == SERVICE_TURN_ON:
light_control.turn_light_on(ent_to_light.get(entity_id),
transition_seconds)
if ATTR_ENTITY_ID in dat:
light_id = ent_to_light.get(dat[ATTR_ENTITY_ID])
else:
light_control.turn_light_off(ent_to_light.get(entity_id),
transition_seconds)
light_id = None
update_light_state(None)
transition = util.dict_get_convert(dat, ATTR_TRANSITION, int, None)
if service.service == SERVICE_TURN_OFF:
light_control.turn_light_off(light_id, transition)
else:
# Processing extra data for turn light on request
bright = util.dict_get_convert(dat, ATTR_BRIGHTNESS, int, 164)
color = None
xy_color = dat.get(ATTR_XY_COLOR)
rgb_color = dat.get(ATTR_RGB_COLOR)
if xy_color:
try:
# xy_color should be a list containing 2 floats
xy_color = [float(val) for val in xy_color]
if len(xy_color) == 2:
color = xy_color
except (TypeError, ValueError):
# TypeError if xy_color was not iterable
# ValueError if value could not be converted to float
pass
if not color and rgb_color:
try:
# rgb_color should be a list containing 3 ints
rgb_color = [int(val) for val in rgb_color]
if len(rgb_color) == 3:
color = util.color_RGB_to_xy(rgb_color[0],
rgb_color[1],
rgb_color[2])
except (TypeError, ValueError):
# TypeError if color has no len
# ValueError if not all values convertable to int
color = None
light_control.turn_light_on(light_id, transition, bright, color)
# Update state of lights touched
if light_id:
update_light_state(light_id)
else:
update_lights_state(None, True)
# Update light state every 30 seconds
ha.track_time_change(bus, update_light_state, second=[0, 30])
ha.track_time_change(bus, update_lights_state, second=[0, 30])
# Listen for light on and light off service calls
bus.register_service(DOMAIN, SERVICE_TURN_ON,
@ -141,6 +230,19 @@ def setup(bus, statemachine, light_control):
return True
LightState = namedtuple("LightState", ['on', 'brightness', 'color'])
def _hue_to_light_state(info):
""" Helper method to convert a Hue state to a LightState. """
try:
return LightState(info['state']['reachable'] and info['state']['on'],
info['state']['bri'], info['state']['xy'])
except KeyError:
# KeyError if one of the keys didn't exist
return None
class HueLightControl(object):
""" Class to interface with the Hue light system. """
@ -168,59 +270,101 @@ class HueLightControl(object):
return
if len(self._bridge.get_light()) == 0:
# Dict mapping light_id to name
self._lights = {}
self._update_lights()
if len(self._lights) == 0:
logger.error("HueLightControl:Could not find any lights. ")
self.success_init = False
else:
self.success_init = True
def get_names(self):
""" Return a dict with id mapped to name. """
def _update_lights(self):
""" Helper method to update the known names from Hue. """
try:
return {int(item[0]): item[1]['name'] for item
self._lights = {int(item[0]): item[1]['name'] for item
in self._bridge.get_light().items()}
except (socket.error, KeyError):
# socket.error because sometimes we cannot reach Hue
# KeyError if we got unexpected data
return {}
# We don't do anything, keep old values
pass
def get_name(self, light_id):
""" Return name for specified light_id or None if no name known. """
if not light_id in self._lights:
self._update_lights()
return self._lights.get(light_id)
def get_state(self, light_id):
""" Return a LightState representing light light_id. """
try:
info = self._bridge.get_light(light_id)
return _hue_to_light_state(info)
except socket.error:
# socket.error when we cannot reach Hue
return None
def get_states(self):
""" Return a dict with id mapped to boolean is_on. """
""" Return a dict with id mapped to LightState objects. """
states = {}
try:
# Light is on if reachable and on
return {int(itm[0]):
itm[1]['state']['reachable'] and itm[1]['state']['on']
for itm in self._bridge.get_api()['lights'].items()}
api = self._bridge.get_api()
except (socket.error, KeyError):
# socket.error because sometimes we cannot reach Hue
# KeyError if we got unexpected data
return {}
except socket.error:
# socket.error when we cannot reach Hue
return states
def turn_light_on(self, light_id=None, transition_seconds=None):
api_states = api.get('lights')
if not isinstance(api_states, dict):
return states
for light_id, info in api_states.items():
state = _hue_to_light_state(info)
if state:
states[int(light_id)] = state
return states
def turn_light_on(self, light_id, transition, brightness, xy_color):
""" Turn the specified or all lights on. """
self._turn_light(True, light_id, transition_seconds)
def turn_light_off(self, light_id=None, transition_seconds=None):
""" Turn the specified or all lights off. """
self._turn_light(False, light_id, transition_seconds)
def _turn_light(self, turn, light_id, transition_seconds):
""" Helper method to turn lights on or off. """
if turn:
command = {'on': True, 'xy': [0.5119, 0.4147], 'bri': 164}
else:
command = {'on': False}
if light_id is None:
light_id = [light.light_id for light in self._bridge.lights]
light_id = self._lights.keys()
if transition_seconds is not None:
command = {'on': True}
if transition is not None:
# Transition time is in 1/10th seconds and cannot exceed
# 900 seconds.
command['transitiontime'] = min(9000, transition_seconds * 10)
command['transitiontime'] = min(9000, transition * 10)
if brightness is not None:
command['bri'] = brightness
if xy_color:
command['xy'] = xy_color
self._bridge.set_light(light_id, command)
def turn_light_off(self, light_id, transition):
""" Turn the specified or all lights off. """
if light_id is None:
light_id = self._lights.keys()
command = {'on': False}
if transition is not None:
# Transition time is in 1/10th seconds and cannot exceed
# 900 seconds.
command['transitiontime'] = min(9000, transition * 10)
self._bridge.set_light(light_id, command)

View file

@ -3,7 +3,6 @@ import threading
import Queue
import datetime
import re
import os
RE_SANITIZE_FILENAME = re.compile(r'(~|\.\.|/|\\)')
RE_SLUGIFY = re.compile(r'[^A-Za-z0-9_]+')
@ -63,14 +62,63 @@ def repr_helper(inp):
return u", ".join(
repr_helper(key)+u"="+repr_helper(item) for key, item
in inp.items())
elif isinstance(inp, list):
return u'[' + u', '.join(inp) + u']'
elif isinstance(inp, datetime.datetime):
return datetime_to_str(inp)
else:
return unicode(inp)
# Taken from: http://www.cse.unr.edu/~quiroz/inc/colortransforms.py
# License: Code is given as is. Use at your own risk and discretion.
# pylint: disable=invalid-name
def color_RGB_to_xy(R, G, B):
''' Convert from RGB color to XY color. '''
var_R = (R / 255.)
var_G = (G / 255.)
var_B = (B / 255.)
if var_R > 0.04045:
var_R = ((var_R + 0.055) / 1.055) ** 2.4
else:
var_R /= 12.92
if var_G > 0.04045:
var_G = ((var_G + 0.055) / 1.055) ** 2.4
else:
var_G /= 12.92
if var_B > 0.04045:
var_B = ((var_B + 0.055) / 1.055) ** 2.4
else:
var_B /= 12.92
var_R *= 100
var_G *= 100
var_B *= 100
# Observer. = 2 deg, Illuminant = D65
X = var_R * 0.4124 + var_G * 0.3576 + var_B * 0.1805
Y = var_R * 0.2126 + var_G * 0.7152 + var_B * 0.0722
Z = var_R * 0.0193 + var_G * 0.1192 + var_B * 0.9505
# Convert XYZ to xy, see CIE 1931 color space on wikipedia
return X / (X + Y + Z), Y / (X + Y + Z)
def dict_get_convert(dic, key, value_type, default=None):
""" Get a value from a dic and ensure it is value_type. """
return convert(dic[key], value_type, default) if key in dic else default
def convert(value, to_type, default=None):
""" Converts value to to_type, returns default if fails. """
try:
return to_type(value)
except ValueError:
# If value could not be converted
return default
# Reason why I decided to roll my own ThreadPool instead of using
# multiprocessing.dummy.pool or even better, use multiprocessing.pool and
# not be hurt by the GIL in the cpython interpreter: