Add parameters for enabling Auth in Apache Kafka integration (#39611)

This commit is contained in:
Nishchith Shetty 2020-09-05 14:14:30 +05:30 committed by GitHub
parent 7be5ef8b6c
commit 3565fec005
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -8,7 +8,9 @@ import voluptuous as vol
from homeassistant.const import (
CONF_IP_ADDRESS,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
EVENT_HOMEASSISTANT_STOP,
EVENT_STATE_CHANGED,
STATE_UNAVAILABLE,
@ -16,6 +18,7 @@ from homeassistant.const import (
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entityfilter import FILTER_SCHEMA
from homeassistant.util import ssl as ssl_util
_LOGGER = logging.getLogger(__name__)
@ -23,6 +26,7 @@ DOMAIN = "apache_kafka"
CONF_FILTER = "filter"
CONF_TOPIC = "topic"
CONF_SECURITY_PROTOCOL = "security_protocol"
CONFIG_SCHEMA = vol.Schema(
{
@ -32,6 +36,11 @@ CONFIG_SCHEMA = vol.Schema(
vol.Required(CONF_PORT): cv.port,
vol.Required(CONF_TOPIC): cv.string,
vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA,
vol.Optional(CONF_SECURITY_PROTOCOL, default="PLAINTEXT"): vol.In(
["PLAINTEXT", "SASL_SSL"]
),
vol.Optional(CONF_USERNAME): cv.string,
vol.Optional(CONF_PASSWORD): cv.string,
}
)
},
@ -49,6 +58,9 @@ async def async_setup(hass, config):
conf[CONF_PORT],
conf[CONF_TOPIC],
conf[CONF_FILTER],
conf[CONF_SECURITY_PROTOCOL],
conf.get(CONF_USERNAME),
conf.get(CONF_PASSWORD),
)
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown())
@ -74,15 +86,31 @@ class DateTimeJSONEncoder(json.JSONEncoder):
class KafkaManager:
"""Define a manager to buffer events to Kafka."""
def __init__(self, hass, ip_address, port, topic, entities_filter):
def __init__(
self,
hass,
ip_address,
port,
topic,
entities_filter,
security_protocol,
username,
password,
):
"""Initialize."""
self._encoder = DateTimeJSONEncoder()
self._entities_filter = entities_filter
self._hass = hass
ssl_context = ssl_util.client_context()
self._producer = AIOKafkaProducer(
loop=hass.loop,
bootstrap_servers=f"{ip_address}:{port}",
compression_type="gzip",
security_protocol=security_protocol,
ssl_context=ssl_context,
sasl_mechanism="PLAIN",
sasl_plain_username=username,
sasl_plain_password=password,
)
self._topic = topic