I'm having an issue with Kafka. Whenever the consumer use SASL_SSL or SASL_PLAINTEXT, it can't start. I've tried to change the consumer to use PLAINTEXT or SSL (without SASL), and its working fine
Below are my configurations and error: *config/server.properties:* listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095 ssl.keystore.location=/root/zoo-kaf/kafka_2.12-3.2.3/ssl/KeyStore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=/root/zoo-kaf/kafka_2.12-3.2.3/ssl/truststore.jks ssl.truststore.password=123456 sasl.enabled.mechanisms=SCRAM-SHA-512,PLAIN listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="alice" \ password="alice-secret" \ user_alice="alice-secret"; listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="alice" \ password="alice-secret" \ user_alice="alice-secret"; listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="alice" \ password="alice-secret" \ user_alice="alice-secret"; listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="alice" \ password="alice-secret" \ user_alice="alice-secret"; ... *consumer.py* Also, does username alice can be used for `sasl_plain_username`? from kafka import KafkaConsumer import logging logging.basicConfig(level=logging.DEBUG) # To consume latest messages and auto-commit offsets consumer = KafkaConsumer('test-topic', bootstrap_servers='127.0.0.1:9094', security_protocol="SASL_SSL", sasl_mechanism='SCRAM-SHA-512', sasl_plain_username='alice', sasl_plain_password='alice-secret', ssl_check_hostname=False, ssl_cafile="/etc/ssl/certs/ca-cert") Consumer log: DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <disconnected> [IPv4 ('127.0.0.1', 9094)]>: creating new socket DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <disconnected> [IPv4 ('127.0.0.1', 9094)]>: setting socket option (6, 1, 1) INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <connecting> [IPv4 ('127.0.0.1', 9094)]>: connecting to 127.0.0.1:9094 [('127.0.0.1', 9094) IPv4] DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <connecting> [IPv4 ('127.0.0.1', 9094)]>: established TCP connection DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <connecting> [IPv4 ('127.0.0.1', 9094)]>: initiating SSL handshake DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <handshake> [IPv4 ('127.0.0.1', 9094)]>: wrapping socket in ssl context DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <handshake> [IPv4 ('127.0.0.1', 9094)]>: completed SSL handshake. DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <handshake> [IPv4 ('127.0.0.1', 9094)]>: initiating SASL authentication DEBUG:kafka.protocol.parser:Sending request SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512') DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <authenticating> [IPv4 ('127.0.0.1', 9094)]> Request 1: SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512') DEBUG:kafka.protocol.parser:Received correlation id: 1 DEBUG:kafka.protocol.parser:Processing response SaslHandShakeResponse_v0 DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <authenticating> [IPv4 ('127.0.0.1', 9094)]> Response 1 (0.7309913635253906 ms): SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['SCRAM-SHA-512', 'PLAIN']) ERROR:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <authenticating> [IPv4 ('127.0.0.1', 9094)]>: Error receiving reply from server Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 692, in _try_authenticate_scram (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 616, in _recv_bytes_blocking raise ConnectionError('Connection reset during recv') ConnectionError: Connection reset during recv INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <authenticating> [IPv4 ('127.0.0.1', 9094)]>: Closing connection. KafkaConnectionError: <BrokerConnection node_id=bootstrap-0 host= 127.0.0.1:9094 <authenticating> [IPv4 ('127.0.0.1', 9094)]>: Connection reset during recv DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094 <authenticating> [IPv4 ('127.0.0.1', 9094)]>: reconnect backoff 0.055656264781808004 after 1 failures Traceback (most recent call last): File "consumer-scram-ssl.py", line 6, in <module> consumer = KafkaConsumer('test-topic', bootstrap_servers='127.0.0.1:9094', security_protocol="SASL_SSL", sasl_mechanism='SCRAM-SHA-512', sasl_plain_username='alice', sasl_plain_password='alice-secret', ssl_check_hostname=False, ssl_cafile="/etc/ssl/certs/ca-cert") File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 356, in __init__ self._client = KafkaClient(metrics=self._metrics, **self.config) File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 244, in __init__ self.config['api_version'] = self.check_version(timeout=check_timeout) File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 900, in check_version raise Errors.NoBrokersAvailable() Broker doesn't return any log