I am trying to configure ssl communication between broker and producer. I followed the instruction on the https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka to create the key and trust store.
My broker comes up without issue, I can run this command - openssl s_client -debug -connect localhost:9093 -tls1_2. It works. So broker is configured currently. I get below when try to producer tries to publish to topic. Plain test port works. C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.9.0.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9093 --topic topic1 adadasdasd [2015-12-10 14:05:24,842] ERROR Error when sending message to topic topic1 with key: null, value: 0 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) I enable enabled ssl debug on the broker I see below error. I enable ssl debug on producer but do it doesn't produce any details log. In procuder.properties tried to change metadata.broker.list=localhost:9092 to metadata.broker.list=localhost:9093, it didn't help. ( I am thinking it something silly) Using SSLEngineImpl. Allow unsafe renegotiation: false Allow legacy hello messages: true Is initial handshake: true Is secure renegotiation: false kafka-network-thread-0-SSL-3, fatal error: 80: problem unwrapping net record javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection? kafka-network-thread-0-SSL-3, SEND TLSv1.2 ALERT: fatal, description = internal_error kafka-network-thread-0-SSL-3, WRITE: TLSv1.2 Alert, length = 2 kafka-network-thread-0-SSL-3, called closeOutbound() kafka-network-thread-0-SSL-3, closeOutboundInternal() kafka-network-thread-0-SSL-3, called closeInbound() kafka-network-thread-0-SSL-3, fatal: engine already closed. Rethrowing javax.net.ssl.SSLException: Inbound closed before receiving peer's close_notify: possible truncation attack? kafka-network-thread-0-SSL-3, called closeOutbound() kafka-network-thread-0-SSL-3, closeOutboundInternal() My producer.properties metadata.broker.list=localhost:9092 producer.type=sync compression.codec=none serializer.class=kafka.serializer.DefaultEncoder ############################# SSL settings ############################# # keystore path assume you are starting from kafka install folder security.protocol = SSL ssl.truststore.location = client.truststore.jks ssl.truststore.password = testpass ssl.keystore.location = client.keystore.jks ssl.keystore.password = testpass ssl.key.password = testpass #ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.) #ssl.cipher.suites (Optional). "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol." ssl.enabled.protocols = TLSv1.2 #ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 **Should list at least one of the protocols configured on the broker side** ssl.truststore.type = JKS ssl.keystore.type = JKS My server.properties broker.id=0 listeners=PLAINTEXT://:9092,SSL://:9093 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false ############################# Zookeeper ############################# zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################# SSL settings ############################# # keystore path assume you are starting from kafka install folder ssl.keystore.location = server.keystore.jks ssl.keystore.password = testpass ssl.key.password = testpass ssl.truststore.location = server.truststore.jks ssl.truststore.password = testpass ssl.client.auth = none #ssl.client.auth = none "required" => client authentication is required, "requested" => client authentication is requested and client without certs can still connect when this option chosen") ssl.enabled.protocols = TLSv1.2 #ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 (list out the SSL protocols that you are going to accept from clients. Do note SSL is deprecated and using that in production is not recommended) ssl.keystore.type = JKS ssl.truststore.type = JKS #security.inter.broker.protocol = SSL no enable for now. Thanks, Shri ________________________________ This message and its contents (to include attachments) are the property of National Health Systems, Inc. and may contain confidential and proprietary information. This email and any files transmitted with it are intended solely for the use of the individual or entity to whom they are addressed. You are hereby notified that any unauthorized disclosure, copying, or distribution of this message, or the taking of any unauthorized action based on information contained herein is strictly prohibited. Unauthorized use of information contained herein may subject you to civil and criminal prosecution and penalties. If you are not the intended recipient, you should delete this message immediately and notify the sender immediately by telephone or by replying to this transmission.