hass-core/homeassistant/components/sensor/openhardwaremonitor.py
Wim Haanstra 2f2952e0ec Openhardwaremonitor (#8056)
* Open Hardware Monitor sensor

Platform which is able to connect to the JSON API of Open Hardware Monitor and adds sensors for the devices.

* Remove copyright in header, not needed.

* - Removed old code
- Fixed typo’s in comments
- Removed log spamming
- Removed code that was unnecessary
- Use requests instead of urllib
- Moved sensor update functionality to data handler, to remove unwanted constructor parameters

* Fixed typo in comment
Added tests

* Added default fixture, to stabilize tests

* - Fix for values deeper than 4 levels, no longer relies on fixed level
- Fixed tests

* Removed timer in preference of helper methods

* Moved update functionality back to Entity….
Updated SCAN INTERVAL

* Added timeout to request
Removed retry when Open Hardware Monitor API is not reachable
Fixed naming of sensors
Flow optimalisations
Fixed tests to use states

* Remove unused import
2017-06-25 13:48:05 -07:00

183 lines
5.3 KiB
Python

"""Support for Open Hardware Monitor Sensor Platform."""
from datetime import timedelta
import logging
import requests
import voluptuous as vol
from homeassistant.util.dt import utcnow
from homeassistant.const import CONF_HOST, CONF_PORT
import homeassistant.helpers.config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
STATE_MIN_VALUE = 'minimal_value'
STATE_MAX_VALUE = 'maximum_value'
STATE_VALUE = 'value'
STATE_OBJECT = 'object'
CONF_INTERVAL = 'interval'
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=15)
SCAN_INTERVAL = timedelta(seconds=30)
RETRY_INTERVAL = timedelta(seconds=30)
OHM_VALUE = 'Value'
OHM_MIN = 'Min'
OHM_MAX = 'Max'
OHM_CHILDREN = 'Children'
OHM_NAME = 'Text'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=8085): cv.port
})
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Setup the Open Hardware Monitor platform."""
data = OpenHardwareMonitorData(config, hass)
add_devices(data.devices, True)
class OpenHardwareMonitorDevice(Entity):
"""Device used to display information from OpenHardwareMonitor."""
def __init__(self, data, name, path, unit_of_measurement):
"""Initialize an OpenHardwareMonitor sensor."""
self._name = name
self._data = data
self.path = path
self.attributes = {}
self._unit_of_measurement = unit_of_measurement
self.value = None
@property
def name(self):
"""Return the name of the device."""
return self._name
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
return self._unit_of_measurement
@property
def state(self):
"""Return the state of the device."""
return self.value
@property
def state_attributes(self):
"""Return the state attributes of the sun."""
return self.attributes
def update(self):
"""Update the device from a new JSON object."""
self._data.update()
array = self._data.data[OHM_CHILDREN]
_attributes = {}
for path_index in range(0, len(self.path)):
path_number = self.path[path_index]
values = array[path_number]
if path_index == len(self.path) - 1:
self.value = values[OHM_VALUE].split(' ')[0]
_attributes.update({
'name': values[OHM_NAME],
STATE_MIN_VALUE: values[OHM_MIN].split(' ')[0],
STATE_MAX_VALUE: values[OHM_MAX].split(' ')[0]
})
self.attributes = _attributes
return
else:
array = array[path_number][OHM_CHILDREN]
_attributes.update({
'level_%s' % path_index: values[OHM_NAME]
})
class OpenHardwareMonitorData(object):
"""Class used to pull data from OHM and create sensors."""
def __init__(self, config, hass):
"""Initialize the Open Hardware Monitor data-handler."""
self.data = None
self._config = config
self._hass = hass
self.devices = []
self.initialize(utcnow())
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Hit by the timer with the configured interval."""
if self.data is None:
self.initialize(utcnow())
else:
self.refresh()
def refresh(self):
"""Download and parse JSON from OHM."""
data_url = "http://%s:%d/data.json" % (
self._config.get(CONF_HOST),
self._config.get(CONF_PORT))
try:
response = requests.get(data_url, timeout=30)
self.data = response.json()
except requests.exceptions.ConnectionError:
_LOGGER.error("ConnectionError: Is OpenHardwareMonitor running?")
def initialize(self, now):
"""Initial parsing of the sensors and adding of devices."""
self.refresh()
if self.data is None:
return
self.devices = self.parse_children(self.data, [], [], [])
def parse_children(self, json, devices, path, names):
"""Recursively loop through child objects, finding the values."""
result = devices.copy()
if len(json[OHM_CHILDREN]) > 0:
for child_index in range(0, len(json[OHM_CHILDREN])):
child_path = path.copy()
child_path.append(child_index)
child_names = names.copy()
if len(path) > 0:
child_names.append(json[OHM_NAME])
obj = json[OHM_CHILDREN][child_index]
added_devices = self.parse_children(
obj, devices, child_path, child_names)
result = result + added_devices
return result
if json[OHM_VALUE].find(' ') == -1:
return result
unit_of_measurement = json[OHM_VALUE].split(' ')[1]
child_names = names.copy()
child_names.append(json[OHM_NAME])
fullname = ' '.join(child_names)
dev = OpenHardwareMonitorDevice(
self,
fullname,
path,
unit_of_measurement
)
result.append(dev)
return result