hass-core/homeassistant/components/netatmo/sensor.py

583 lines
22 KiB
Python

"""Support for the Netatmo Weather Service."""
from datetime import timedelta
import logging
from time import time
import threading
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_MODE, CONF_MONITORED_CONDITIONS,
TEMP_CELSIUS, DEVICE_CLASS_HUMIDITY, DEVICE_CLASS_TEMPERATURE,
DEVICE_CLASS_BATTERY)
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from .const import DATA_NETATMO_AUTH
_LOGGER = logging.getLogger(__name__)
CONF_MODULES = 'modules'
CONF_STATION = 'station'
CONF_AREAS = 'areas'
CONF_LAT_NE = 'lat_ne'
CONF_LON_NE = 'lon_ne'
CONF_LAT_SW = 'lat_sw'
CONF_LON_SW = 'lon_sw'
DEFAULT_MODE = 'avg'
MODE_TYPES = {'max', 'avg'}
DEFAULT_NAME_PUBLIC = 'Netatmo Public Data'
# This is the Netatmo data upload interval in seconds
NETATMO_UPDATE_INTERVAL = 600
# NetAtmo Public Data is uploaded to server every 10 minutes
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=600)
SUPPORTED_PUBLIC_SENSOR_TYPES = [
'temperature', 'pressure', 'humidity', 'rain', 'windstrength',
'guststrength'
]
SENSOR_TYPES = {
'temperature': ['Temperature', TEMP_CELSIUS, 'mdi:thermometer',
DEVICE_CLASS_TEMPERATURE],
'co2': ['CO2', 'ppm', 'mdi:cloud', None],
'pressure': ['Pressure', 'mbar', 'mdi:gauge', None],
'noise': ['Noise', 'dB', 'mdi:volume-high', None],
'humidity': ['Humidity', '%', 'mdi:water-percent', DEVICE_CLASS_HUMIDITY],
'rain': ['Rain', 'mm', 'mdi:weather-rainy', None],
'sum_rain_1': ['sum_rain_1', 'mm', 'mdi:weather-rainy', None],
'sum_rain_24': ['sum_rain_24', 'mm', 'mdi:weather-rainy', None],
'battery_vp': ['Battery', '', 'mdi:battery', None],
'battery_lvl': ['Battery_lvl', '', 'mdi:battery', None],
'battery_percent': ['battery_percent', '%', None, DEVICE_CLASS_BATTERY],
'min_temp': ['Min Temp.', TEMP_CELSIUS, 'mdi:thermometer', None],
'max_temp': ['Max Temp.', TEMP_CELSIUS, 'mdi:thermometer', None],
'windangle': ['Angle', '', 'mdi:compass', None],
'windangle_value': ['Angle Value', 'º', 'mdi:compass', None],
'windstrength': ['Wind Strength', 'km/h', 'mdi:weather-windy', None],
'gustangle': ['Gust Angle', '', 'mdi:compass', None],
'gustangle_value': ['Gust Angle Value', 'º', 'mdi:compass', None],
'guststrength': ['Gust Strength', 'km/h', 'mdi:weather-windy', None],
'rf_status': ['Radio', '', 'mdi:signal', None],
'rf_status_lvl': ['Radio_lvl', '', 'mdi:signal', None],
'wifi_status': ['Wifi', '', 'mdi:wifi', None],
'wifi_status_lvl': ['Wifi_lvl', 'dBm', 'mdi:wifi', None],
'health_idx': ['Health', '', 'mdi:cloud', None],
}
MODULE_SCHEMA = vol.Schema({
vol.Required(cv.string): vol.All(cv.ensure_list, [vol.In(SENSOR_TYPES)]),
})
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_STATION): cv.string,
vol.Optional(CONF_MODULES): MODULE_SCHEMA,
vol.Optional(CONF_AREAS): vol.All(cv.ensure_list, [
{
vol.Required(CONF_LAT_NE): cv.latitude,
vol.Required(CONF_LAT_SW): cv.latitude,
vol.Required(CONF_LON_NE): cv.longitude,
vol.Required(CONF_LON_SW): cv.longitude,
vol.Required(CONF_MONITORED_CONDITIONS): [vol.In(
SUPPORTED_PUBLIC_SENSOR_TYPES)],
vol.Optional(CONF_MODE, default=DEFAULT_MODE): vol.In(MODE_TYPES),
vol.Optional(CONF_NAME, default=DEFAULT_NAME_PUBLIC): cv.string
}
]),
})
MODULE_TYPE_OUTDOOR = 'NAModule1'
MODULE_TYPE_WIND = 'NAModule2'
MODULE_TYPE_RAIN = 'NAModule3'
MODULE_TYPE_INDOOR = 'NAModule4'
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the available Netatmo weather sensors."""
dev = []
not_handled = {}
auth = hass.data[DATA_NETATMO_AUTH]
if config.get(CONF_AREAS) is not None:
for area in config[CONF_AREAS]:
data = NetatmoPublicData(
auth,
lat_ne=area[CONF_LAT_NE],
lon_ne=area[CONF_LON_NE],
lat_sw=area[CONF_LAT_SW],
lon_sw=area[CONF_LON_SW]
)
for sensor_type in area[CONF_MONITORED_CONDITIONS]:
dev.append(NetatmoPublicSensor(
area[CONF_NAME],
data,
sensor_type,
area[CONF_MODE]
))
else:
for data_class in all_product_classes():
data = NetatmoData(auth, data_class, config.get(CONF_STATION))
module_items = []
# Test if manually configured
if CONF_MODULES in config:
module_items = config[CONF_MODULES].items()
else:
# otherwise add all modules and conditions
for module_name in data.get_module_names():
monitored_conditions = \
data.station_data.monitoredConditions(module_name)
module_items.append(
(module_name, monitored_conditions))
for module_name, monitored_conditions in module_items:
# Test if module exists
if module_name not in data.get_module_names():
not_handled[module_name] = \
not_handled[module_name]+1 \
if module_name in not_handled else 1
else:
# Only create sensors for monitored properties
for condition in monitored_conditions:
dev.append(NetatmoSensor(
data, module_name, condition.lower()))
for module_name, _ in not_handled.items():
_LOGGER.error('Module name: "%s" not found', module_name)
if dev:
add_entities(dev, True)
def all_product_classes():
"""Provide all handled Netatmo product classes."""
import pyatmo
return [pyatmo.WeatherStationData, pyatmo.HomeCoachData]
class NetatmoSensor(Entity):
"""Implementation of a Netatmo sensor."""
def __init__(self, netatmo_data, module_name, sensor_type):
"""Initialize the sensor."""
self._name = 'Netatmo {} {}'.format(module_name,
SENSOR_TYPES[sensor_type][0])
self.netatmo_data = netatmo_data
self.module_name = module_name
self.type = sensor_type
self._state = None
self._device_class = SENSOR_TYPES[self.type][3]
self._icon = SENSOR_TYPES[self.type][2]
self._unit_of_measurement = SENSOR_TYPES[self.type][1]
self._module_type = self.netatmo_data. \
station_data.moduleByName(module=module_name)['type']
module_id = self.netatmo_data. \
station_data.moduleByName(module=module_name)['_id']
self._unique_id = '{}-{}'.format(module_id, self.type)
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return self._icon
@property
def device_class(self):
"""Return the device class of the sensor."""
return self._device_class
@property
def state(self):
"""Return the state of the device."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
return self._unit_of_measurement
@property
def unique_id(self):
"""Return the unique ID for this sensor."""
return self._unique_id
def update(self):
"""Get the latest data from Netatmo API and updates the states."""
self.netatmo_data.update()
if self.netatmo_data.data is None:
if self._state is None:
return
_LOGGER.warning("No data found for %s", self.module_name)
self._state = None
return
data = self.netatmo_data.data.get(self.module_name)
if data is None:
_LOGGER.warning("No data found for %s", self.module_name)
self._state = None
return
try:
if self.type == 'temperature':
self._state = round(data['Temperature'], 1)
elif self.type == 'humidity':
self._state = data['Humidity']
elif self.type == 'rain':
self._state = data['Rain']
elif self.type == 'sum_rain_1':
self._state = data['sum_rain_1']
elif self.type == 'sum_rain_24':
self._state = data['sum_rain_24']
elif self.type == 'noise':
self._state = data['Noise']
elif self.type == 'co2':
self._state = data['CO2']
elif self.type == 'pressure':
self._state = round(data['Pressure'], 1)
elif self.type == 'battery_percent':
self._state = data['battery_percent']
elif self.type == 'battery_lvl':
self._state = data['battery_vp']
elif (self.type == 'battery_vp' and
self._module_type == MODULE_TYPE_WIND):
if data['battery_vp'] >= 5590:
self._state = "Full"
elif data['battery_vp'] >= 5180:
self._state = "High"
elif data['battery_vp'] >= 4770:
self._state = "Medium"
elif data['battery_vp'] >= 4360:
self._state = "Low"
elif data['battery_vp'] < 4360:
self._state = "Very Low"
elif (self.type == 'battery_vp' and
self._module_type == MODULE_TYPE_RAIN):
if data['battery_vp'] >= 5500:
self._state = "Full"
elif data['battery_vp'] >= 5000:
self._state = "High"
elif data['battery_vp'] >= 4500:
self._state = "Medium"
elif data['battery_vp'] >= 4000:
self._state = "Low"
elif data['battery_vp'] < 4000:
self._state = "Very Low"
elif (self.type == 'battery_vp' and
self._module_type == MODULE_TYPE_INDOOR):
if data['battery_vp'] >= 5640:
self._state = "Full"
elif data['battery_vp'] >= 5280:
self._state = "High"
elif data['battery_vp'] >= 4920:
self._state = "Medium"
elif data['battery_vp'] >= 4560:
self._state = "Low"
elif data['battery_vp'] < 4560:
self._state = "Very Low"
elif (self.type == 'battery_vp' and
self._module_type == MODULE_TYPE_OUTDOOR):
if data['battery_vp'] >= 5500:
self._state = "Full"
elif data['battery_vp'] >= 5000:
self._state = "High"
elif data['battery_vp'] >= 4500:
self._state = "Medium"
elif data['battery_vp'] >= 4000:
self._state = "Low"
elif data['battery_vp'] < 4000:
self._state = "Very Low"
elif self.type == 'min_temp':
self._state = data['min_temp']
elif self.type == 'max_temp':
self._state = data['max_temp']
elif self.type == 'windangle_value':
self._state = data['WindAngle']
elif self.type == 'windangle':
if data['WindAngle'] >= 330:
self._state = "N (%d\xb0)" % data['WindAngle']
elif data['WindAngle'] >= 300:
self._state = "NW (%d\xb0)" % data['WindAngle']
elif data['WindAngle'] >= 240:
self._state = "W (%d\xb0)" % data['WindAngle']
elif data['WindAngle'] >= 210:
self._state = "SW (%d\xb0)" % data['WindAngle']
elif data['WindAngle'] >= 150:
self._state = "S (%d\xb0)" % data['WindAngle']
elif data['WindAngle'] >= 120:
self._state = "SE (%d\xb0)" % data['WindAngle']
elif data['WindAngle'] >= 60:
self._state = "E (%d\xb0)" % data['WindAngle']
elif data['WindAngle'] >= 30:
self._state = "NE (%d\xb0)" % data['WindAngle']
elif data['WindAngle'] >= 0:
self._state = "N (%d\xb0)" % data['WindAngle']
elif self.type == 'windstrength':
self._state = data['WindStrength']
elif self.type == 'gustangle_value':
self._state = data['GustAngle']
elif self.type == 'gustangle':
if data['GustAngle'] >= 330:
self._state = "N (%d\xb0)" % data['GustAngle']
elif data['GustAngle'] >= 300:
self._state = "NW (%d\xb0)" % data['GustAngle']
elif data['GustAngle'] >= 240:
self._state = "W (%d\xb0)" % data['GustAngle']
elif data['GustAngle'] >= 210:
self._state = "SW (%d\xb0)" % data['GustAngle']
elif data['GustAngle'] >= 150:
self._state = "S (%d\xb0)" % data['GustAngle']
elif data['GustAngle'] >= 120:
self._state = "SE (%d\xb0)" % data['GustAngle']
elif data['GustAngle'] >= 60:
self._state = "E (%d\xb0)" % data['GustAngle']
elif data['GustAngle'] >= 30:
self._state = "NE (%d\xb0)" % data['GustAngle']
elif data['GustAngle'] >= 0:
self._state = "N (%d\xb0)" % data['GustAngle']
elif self.type == 'guststrength':
self._state = data['GustStrength']
elif self.type == 'rf_status_lvl':
self._state = data['rf_status']
elif self.type == 'rf_status':
if data['rf_status'] >= 90:
self._state = "Low"
elif data['rf_status'] >= 76:
self._state = "Medium"
elif data['rf_status'] >= 60:
self._state = "High"
elif data['rf_status'] <= 59:
self._state = "Full"
elif self.type == 'wifi_status_lvl':
self._state = data['wifi_status']
elif self.type == 'wifi_status':
if data['wifi_status'] >= 86:
self._state = "Low"
elif data['wifi_status'] >= 71:
self._state = "Medium"
elif data['wifi_status'] >= 56:
self._state = "High"
elif data['wifi_status'] <= 55:
self._state = "Full"
elif self.type == 'health_idx':
if data['health_idx'] == 0:
self._state = "Healthy"
elif data['health_idx'] == 1:
self._state = "Fine"
elif data['health_idx'] == 2:
self._state = "Fair"
elif data['health_idx'] == 3:
self._state = "Poor"
elif data['health_idx'] == 4:
self._state = "Unhealthy"
except KeyError:
_LOGGER.error("No %s data found for %s", self.type,
self.module_name)
self._state = None
return
class NetatmoPublicSensor(Entity):
"""Represent a single sensor in a Netatmo."""
def __init__(self, area_name, data, sensor_type, mode):
"""Initialize the sensor."""
self.netatmo_data = data
self.type = sensor_type
self._mode = mode
self._name = '{} {}'.format(area_name,
SENSOR_TYPES[self.type][0])
self._area_name = area_name
self._state = None
self._device_class = SENSOR_TYPES[self.type][3]
self._icon = SENSOR_TYPES[self.type][2]
self._unit_of_measurement = SENSOR_TYPES[self.type][1]
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def icon(self):
"""Icon to use in the frontend."""
return self._icon
@property
def device_class(self):
"""Return the device class of the sensor."""
return self._device_class
@property
def state(self):
"""Return the state of the device."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity."""
return self._unit_of_measurement
def update(self):
"""Get the latest data from Netatmo API and updates the states."""
self.netatmo_data.update()
if self.netatmo_data.data is None:
_LOGGER.warning("No data found for %s", self._name)
self._state = None
return
data = None
if self.type == 'temperature':
data = self.netatmo_data.data.getLatestTemperatures()
elif self.type == 'pressure':
data = self.netatmo_data.data.getLatestPressures()
elif self.type == 'humidity':
data = self.netatmo_data.data.getLatestHumidities()
elif self.type == 'rain':
data = self.netatmo_data.data.getLatestRain()
elif self.type == 'windstrength':
data = self.netatmo_data.data.getLatestWindStrengths()
elif self.type == 'guststrength':
data = self.netatmo_data.data.getLatestGustStrengths()
if not data:
_LOGGER.warning("No station provides %s data in the area %s",
self.type, self._area_name)
self._state = None
return
if self._mode == 'avg':
self._state = round(sum(data.values()) / len(data), 1)
elif self._mode == 'max':
self._state = max(data.values())
class NetatmoPublicData:
"""Get the latest data from Netatmo."""
def __init__(self, auth, lat_ne, lon_ne, lat_sw, lon_sw):
"""Initialize the data object."""
self.auth = auth
self.data = None
self.lat_ne = lat_ne
self.lon_ne = lon_ne
self.lat_sw = lat_sw
self.lon_sw = lon_sw
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Request an update from the Netatmo API."""
import pyatmo
data = pyatmo.PublicData(self.auth,
LAT_NE=self.lat_ne,
LON_NE=self.lon_ne,
LAT_SW=self.lat_sw,
LON_SW=self.lon_sw,
filtering=True)
if data.CountStationInArea() == 0:
_LOGGER.warning('No Stations available in this area.')
return
self.data = data
class NetatmoData:
"""Get the latest data from Netatmo."""
def __init__(self, auth, data_class, station):
"""Initialize the data object."""
self.auth = auth
self.data_class = data_class
self.data = {}
self.station_data = None
self.station = station
self._next_update = time()
self._update_in_progress = threading.Lock()
def get_module_names(self):
"""Return all module available on the API as a list."""
self.update()
return self.data.keys()
def _detect_platform_type(self):
"""Return the XXXData object corresponding to the specified platform.
The return can be a WeatherStationData or a HomeCoachData.
"""
from pyatmo import NoDevice
try:
station_data = self.data_class(self.auth)
_LOGGER.debug("%s detected!", str(self.data_class.__name__))
return station_data
except NoDevice:
_LOGGER.error("No Weather or HomeCoach devices found for %s", str(
self.station
))
raise
def update(self):
"""Call the Netatmo API to update the data.
This method is not throttled by the builtin Throttle decorator
but with a custom logic, which takes into account the time
of the last update from the cloud.
"""
if time() < self._next_update or \
not self._update_in_progress.acquire(False):
return
from pyatmo import NoDevice
try:
self.station_data = self._detect_platform_type()
except NoDevice:
return
try:
if self.station is not None:
self.data = self.station_data.lastData(
station=self.station, exclude=3600)
else:
self.data = self.station_data.lastData(exclude=3600)
newinterval = 0
try:
for module in self.data:
if 'When' in self.data[module]:
newinterval = self.data[module]['When']
break
except TypeError:
_LOGGER.debug("No %s modules found", self.data_class.__name__)
if newinterval:
# Try and estimate when fresh data will be available
newinterval += NETATMO_UPDATE_INTERVAL - time()
if newinterval > NETATMO_UPDATE_INTERVAL - 30:
newinterval = NETATMO_UPDATE_INTERVAL
else:
if newinterval < NETATMO_UPDATE_INTERVAL / 2:
# Never hammer the Netatmo API more than
# twice per update interval
newinterval = NETATMO_UPDATE_INTERVAL / 2
_LOGGER.info(
"Netatmo refresh interval reset to %d seconds",
newinterval)
else:
# Last update time not found, fall back to default value
newinterval = NETATMO_UPDATE_INTERVAL
self._next_update = time() + newinterval
finally:
self._update_in_progress.release()