Add metrics upload by UDP to graphite (#43751)
This commit is contained in:
parent
20485eb132
commit
642bb91a9a
2 changed files with 48 additions and 19 deletions
|
@ -12,6 +12,7 @@ from homeassistant.const import (
|
||||||
CONF_HOST,
|
CONF_HOST,
|
||||||
CONF_PORT,
|
CONF_PORT,
|
||||||
CONF_PREFIX,
|
CONF_PREFIX,
|
||||||
|
CONF_PROTOCOL,
|
||||||
EVENT_HOMEASSISTANT_START,
|
EVENT_HOMEASSISTANT_START,
|
||||||
EVENT_HOMEASSISTANT_STOP,
|
EVENT_HOMEASSISTANT_STOP,
|
||||||
EVENT_STATE_CHANGED,
|
EVENT_STATE_CHANGED,
|
||||||
|
@ -21,8 +22,11 @@ import homeassistant.helpers.config_validation as cv
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
PROTOCOL_TCP = "tcp"
|
||||||
|
PROTOCOL_UDP = "udp"
|
||||||
DEFAULT_HOST = "localhost"
|
DEFAULT_HOST = "localhost"
|
||||||
DEFAULT_PORT = 2003
|
DEFAULT_PORT = 2003
|
||||||
|
DEFAULT_PROTOCOL = PROTOCOL_TCP
|
||||||
DEFAULT_PREFIX = "ha"
|
DEFAULT_PREFIX = "ha"
|
||||||
DOMAIN = "graphite"
|
DOMAIN = "graphite"
|
||||||
|
|
||||||
|
@ -32,6 +36,9 @@ CONFIG_SCHEMA = vol.Schema(
|
||||||
{
|
{
|
||||||
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
|
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
|
||||||
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
|
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
|
||||||
|
vol.Optional(CONF_PROTOCOL, default=DEFAULT_PROTOCOL): vol.Any(
|
||||||
|
PROTOCOL_TCP, PROTOCOL_UDP
|
||||||
|
),
|
||||||
vol.Optional(CONF_PREFIX, default=DEFAULT_PREFIX): cv.string,
|
vol.Optional(CONF_PREFIX, default=DEFAULT_PREFIX): cv.string,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -46,29 +53,34 @@ def setup(hass, config):
|
||||||
host = conf.get(CONF_HOST)
|
host = conf.get(CONF_HOST)
|
||||||
prefix = conf.get(CONF_PREFIX)
|
prefix = conf.get(CONF_PREFIX)
|
||||||
port = conf.get(CONF_PORT)
|
port = conf.get(CONF_PORT)
|
||||||
|
protocol = conf.get(CONF_PROTOCOL)
|
||||||
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
if protocol == PROTOCOL_TCP:
|
||||||
try:
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
sock.connect((host, port))
|
try:
|
||||||
sock.shutdown(2)
|
sock.connect((host, port))
|
||||||
_LOGGER.debug("Connection to Graphite possible")
|
sock.shutdown(2)
|
||||||
except OSError:
|
_LOGGER.debug("Connection to Graphite possible")
|
||||||
_LOGGER.error("Not able to connect to Graphite")
|
except OSError:
|
||||||
return False
|
_LOGGER.error("Not able to connect to Graphite")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
_LOGGER.debug("No connection check for UDP possible")
|
||||||
|
|
||||||
GraphiteFeeder(hass, host, port, prefix)
|
GraphiteFeeder(hass, host, port, protocol, prefix)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
class GraphiteFeeder(threading.Thread):
|
class GraphiteFeeder(threading.Thread):
|
||||||
"""Feed data to Graphite."""
|
"""Feed data to Graphite."""
|
||||||
|
|
||||||
def __init__(self, hass, host, port, prefix):
|
def __init__(self, hass, host, port, protocol, prefix):
|
||||||
"""Initialize the feeder."""
|
"""Initialize the feeder."""
|
||||||
super().__init__(daemon=True)
|
super().__init__(daemon=True)
|
||||||
self._hass = hass
|
self._hass = hass
|
||||||
self._host = host
|
self._host = host
|
||||||
self._port = port
|
self._port = port
|
||||||
|
self._protocol = protocol
|
||||||
# rstrip any trailing dots in case they think they need it
|
# rstrip any trailing dots in case they think they need it
|
||||||
self._prefix = prefix.rstrip(".")
|
self._prefix = prefix.rstrip(".")
|
||||||
self._queue = queue.Queue()
|
self._queue = queue.Queue()
|
||||||
|
@ -101,12 +113,16 @@ class GraphiteFeeder(threading.Thread):
|
||||||
|
|
||||||
def _send_to_graphite(self, data):
|
def _send_to_graphite(self, data):
|
||||||
"""Send data to Graphite."""
|
"""Send data to Graphite."""
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
if self._protocol == PROTOCOL_TCP:
|
||||||
sock.settimeout(10)
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
sock.connect((self._host, self._port))
|
sock.settimeout(10)
|
||||||
sock.sendall(data.encode("ascii"))
|
sock.connect((self._host, self._port))
|
||||||
sock.send(b"\n")
|
sock.sendall(data.encode("ascii"))
|
||||||
sock.close()
|
sock.send(b"\n")
|
||||||
|
sock.close()
|
||||||
|
else:
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
sock.sendto(data.encode("ascii") + b"\n", (self._host, self._port))
|
||||||
|
|
||||||
def _report_attributes(self, entity_id, new_state):
|
def _report_attributes(self, entity_id, new_state):
|
||||||
"""Report the attributes."""
|
"""Report the attributes."""
|
||||||
|
|
|
@ -24,7 +24,7 @@ class TestGraphite(unittest.TestCase):
|
||||||
def setup_method(self, method):
|
def setup_method(self, method):
|
||||||
"""Set up things to be run when tests are started."""
|
"""Set up things to be run when tests are started."""
|
||||||
self.hass = get_test_home_assistant()
|
self.hass = get_test_home_assistant()
|
||||||
self.gf = graphite.GraphiteFeeder(self.hass, "foo", 123, "ha")
|
self.gf = graphite.GraphiteFeeder(self.hass, "foo", 123, "tcp", "ha")
|
||||||
|
|
||||||
def teardown_method(self, method):
|
def teardown_method(self, method):
|
||||||
"""Stop everything that was started."""
|
"""Stop everything that was started."""
|
||||||
|
@ -45,10 +45,23 @@ class TestGraphite(unittest.TestCase):
|
||||||
|
|
||||||
assert setup_component(self.hass, graphite.DOMAIN, config)
|
assert setup_component(self.hass, graphite.DOMAIN, config)
|
||||||
assert mock_gf.call_count == 1
|
assert mock_gf.call_count == 1
|
||||||
assert mock_gf.call_args == mock.call(self.hass, "foo", 123, "me")
|
assert mock_gf.call_args == mock.call(self.hass, "foo", 123, "tcp", "me")
|
||||||
assert mock_socket.call_count == 1
|
assert mock_socket.call_count == 1
|
||||||
assert mock_socket.call_args == mock.call(socket.AF_INET, socket.SOCK_STREAM)
|
assert mock_socket.call_args == mock.call(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
|
||||||
|
@patch("socket.socket")
|
||||||
|
@patch("homeassistant.components.graphite.GraphiteFeeder")
|
||||||
|
def test_full_udp_config(self, mock_gf, mock_socket):
|
||||||
|
"""Test setup with full configuration and UDP protocol."""
|
||||||
|
config = {
|
||||||
|
"graphite": {"host": "foo", "port": 123, "protocol": "udp", "prefix": "me"}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert setup_component(self.hass, graphite.DOMAIN, config)
|
||||||
|
assert mock_gf.call_count == 1
|
||||||
|
assert mock_gf.call_args == mock.call(self.hass, "foo", 123, "udp", "me")
|
||||||
|
assert mock_socket.call_count == 0
|
||||||
|
|
||||||
@patch("socket.socket")
|
@patch("socket.socket")
|
||||||
@patch("homeassistant.components.graphite.GraphiteFeeder")
|
@patch("homeassistant.components.graphite.GraphiteFeeder")
|
||||||
def test_config_port(self, mock_gf, mock_socket):
|
def test_config_port(self, mock_gf, mock_socket):
|
||||||
|
@ -63,7 +76,7 @@ class TestGraphite(unittest.TestCase):
|
||||||
def test_subscribe(self):
|
def test_subscribe(self):
|
||||||
"""Test the subscription."""
|
"""Test the subscription."""
|
||||||
fake_hass = mock.MagicMock()
|
fake_hass = mock.MagicMock()
|
||||||
gf = graphite.GraphiteFeeder(fake_hass, "foo", 123, "ha")
|
gf = graphite.GraphiteFeeder(fake_hass, "foo", 123, "tcp", "ha")
|
||||||
fake_hass.bus.listen_once.has_calls(
|
fake_hass.bus.listen_once.has_calls(
|
||||||
[
|
[
|
||||||
mock.call(EVENT_HOMEASSISTANT_START, gf.start_listen),
|
mock.call(EVENT_HOMEASSISTANT_START, gf.start_listen),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue