diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py index c64e2159977..52f57a4b753 100644 --- a/homeassistant/components/apache_kafka/__init__.py +++ b/homeassistant/components/apache_kafka/__init__.py @@ -8,7 +8,9 @@ import voluptuous as vol from homeassistant.const import ( CONF_IP_ADDRESS, + CONF_PASSWORD, CONF_PORT, + CONF_USERNAME, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, STATE_UNAVAILABLE, @@ -16,6 +18,7 @@ from homeassistant.const import ( ) import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entityfilter import FILTER_SCHEMA +from homeassistant.util import ssl as ssl_util _LOGGER = logging.getLogger(__name__) @@ -23,6 +26,7 @@ DOMAIN = "apache_kafka" CONF_FILTER = "filter" CONF_TOPIC = "topic" +CONF_SECURITY_PROTOCOL = "security_protocol" CONFIG_SCHEMA = vol.Schema( { @@ -32,6 +36,11 @@ CONFIG_SCHEMA = vol.Schema( vol.Required(CONF_PORT): cv.port, vol.Required(CONF_TOPIC): cv.string, vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA, + vol.Optional(CONF_SECURITY_PROTOCOL, default="PLAINTEXT"): vol.In( + ["PLAINTEXT", "SASL_SSL"] + ), + vol.Optional(CONF_USERNAME): cv.string, + vol.Optional(CONF_PASSWORD): cv.string, } ) }, @@ -49,6 +58,9 @@ async def async_setup(hass, config): conf[CONF_PORT], conf[CONF_TOPIC], conf[CONF_FILTER], + conf[CONF_SECURITY_PROTOCOL], + conf.get(CONF_USERNAME), + conf.get(CONF_PASSWORD), ) hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown()) @@ -74,15 +86,31 @@ class DateTimeJSONEncoder(json.JSONEncoder): class KafkaManager: """Define a manager to buffer events to Kafka.""" - def __init__(self, hass, ip_address, port, topic, entities_filter): + def __init__( + self, + hass, + ip_address, + port, + topic, + entities_filter, + security_protocol, + username, + password, + ): """Initialize.""" self._encoder = DateTimeJSONEncoder() self._entities_filter = entities_filter self._hass = hass + ssl_context = ssl_util.client_context() self._producer = AIOKafkaProducer( loop=hass.loop, bootstrap_servers=f"{ip_address}:{port}", compression_type="gzip", + security_protocol=security_protocol, + ssl_context=ssl_context, + sasl_mechanism="PLAIN", + sasl_plain_username=username, + sasl_plain_password=password, ) self._topic = topic