Sai Pradeep Amaravadhi created KAFKA-10771:
----------------------------------------------

             Summary: Unable to consume data from kafka
                 Key: KAFKA-10771
                 URL: https://issues.apache.org/jira/browse/KAFKA-10771
             Project: Kafka
          Issue Type: Bug
          Components: consumer
            Reporter: Sai Pradeep Amaravadhi


Below is the code and i am unable to consume data because of "1966 [main] WARN 
org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=di-claims-sub-cats-test, groupId=di-claims-sub-cats-test] Connection 
to node -7 (sp-dev-broker-6.streawsnprd.massmutual.com/10.97.217.35:9092) 
terminated during authentication. This may happen due to any of the following 
reasons: (1) Authentication failed due to invalid credentials with brokers 
older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow 
HTTPS traffic), (3) Transient network issue.",

 

Code:

package kafka;package kafka;
import java.util.Arrays;import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;import 
org.apache.kafka.clients.consumer.ConsumerConfig;import 
org.apache.kafka.clients.consumer.ConsumerRecord;import 
org.apache.kafka.clients.consumer.ConsumerRecords;import 
org.apache.kafka.clients.consumer.KafkaConsumer;import 
org.apache.kafka.common.config.SaslConfigs;import 
org.apache.kafka.common.config.SslConfigs;import 
org.apache.kafka.common.serialization.StringDeserializer;
import com.fabricapi.GlobalVariables;
public class kafkaLastRecord<K, V> extends java.lang.Object { private final 
static String BOOTSTRAP_SERVERS = 
"sp-dev-broker-0.streawsnprd.massmutual.com:9092,sp-dev-broker-1.streawsnprd.massmutual.com:9092,sp-dev-broker-2.streawsnprd.massmutual.com:9092,sp-dev-broker-3.streawsnprd.massmutual.com:9092,
 
sp-dev-broker-4.streawsnprd.massmutual.com:9092,sp-dev-broker-5.streawsnprd.massmutual.com:9092,sp-dev-broker-6.streawsnprd.massmutual.com:9092";
 private final static String TOPIC = "claims.event.diagreement.initial.request";
 public static void main(String[] args) { Properties props = new Properties(); 
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); 
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); 
props.put(SaslConfigs.SASL_JAAS_CONFIG, 
"org.apache.kafka.common.security.plain.PlainLoginModule required username=" + 
'"' + "di-claims-app" + '"' + " password=" + '"' + "di-claims-secret" + '"' + 
";"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
"/src/test/resources/data/mm-cert-bundle.jks"); 
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststorepass"); 
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()); 
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, 
"di-claims-sub-cats-test"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
 KafkaConsumer<String, String> consumer = null; // List of topics to subscribe 
to while (true) \{ try { consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<String, String> 
records = consumer.poll(100); int length = 0; for (ConsumerRecord<String, 
String> record : records) { length = length + 1; System.out.printf("Offset = 
%d\n", record.offset()); System.out.printf("Key    = %s\n", record.key()); 
System.out.printf("Value  = %s\n", record.value()); } if 
(GlobalVariables.recordsCountBeforeSendingrequest == length) break; } catch 
(Exception e) \{ e.printStackTrace(); } finally \{ consumer.close(); } } }}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to