Sai Pradeep Amaravadhi created KAFKA-10771: ----------------------------------------------
Summary: Unable to consume data from kafka Key: KAFKA-10771 URL: https://issues.apache.org/jira/browse/KAFKA-10771 Project: Kafka Issue Type: Bug Components: consumer Reporter: Sai Pradeep Amaravadhi Below is the code and i am unable to consume data because of "1966 [main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=di-claims-sub-cats-test, groupId=di-claims-sub-cats-test] Connection to node -7 (sp-dev-broker-6.streawsnprd.massmutual.com/10.97.217.35:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.", Code: package kafka;package kafka; import java.util.Arrays;import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.config.SaslConfigs;import org.apache.kafka.common.config.SslConfigs;import org.apache.kafka.common.serialization.StringDeserializer; import com.fabricapi.GlobalVariables; public class kafkaLastRecord<K, V> extends java.lang.Object { private final static String BOOTSTRAP_SERVERS = "sp-dev-broker-0.streawsnprd.massmutual.com:9092,sp-dev-broker-1.streawsnprd.massmutual.com:9092,sp-dev-broker-2.streawsnprd.massmutual.com:9092,sp-dev-broker-3.streawsnprd.massmutual.com:9092, sp-dev-broker-4.streawsnprd.massmutual.com:9092,sp-dev-broker-5.streawsnprd.massmutual.com:9092,sp-dev-broker-6.streawsnprd.massmutual.com:9092"; private final static String TOPIC = "claims.event.diagreement.initial.request"; public static void main(String[] args) { Properties props = new Properties(); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + '"' + "di-claims-app" + '"' + " password=" + '"' + "di-claims-secret" + '"' + ";"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/src/test/resources/data/mm-cert-bundle.jks"); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststorepass"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "di-claims-sub-cats-test"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); KafkaConsumer<String, String> consumer = null; // List of topics to subscribe to while (true) \{ try { consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<String, String> records = consumer.poll(100); int length = 0; for (ConsumerRecord<String, String> record : records) { length = length + 1; System.out.printf("Offset = %d\n", record.offset()); System.out.printf("Key = %s\n", record.key()); System.out.printf("Value = %s\n", record.value()); } if (GlobalVariables.recordsCountBeforeSendingrequest == length) break; } catch (Exception e) \{ e.printStackTrace(); } finally \{ consumer.close(); } } }} -- This message was sent by Atlassian Jira (v8.3.4#803005)