[ 
https://issues.apache.org/jira/browse/KAFKA-3972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-3972.
------------------------------------------
    Resolution: Invalid
      Assignee: Ewen Cheslack-Postava

> kafka java consumer poll returns 0 records after seekToBeginning
> ----------------------------------------------------------------
>
>                 Key: KAFKA-3972
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3972
>             Project: Kafka
>          Issue Type: Task
>          Components: consumer
>    Affects Versions: 0.10.0.0
>         Environment: docker image elasticsearch:latest, kafka scala version 
> 2.11, kafka version 0.10.0.0
>            Reporter: don caldwell
>            Assignee: Ewen Cheslack-Postava
>              Labels: kafka, polling
>
> kafkacat successfully returns rows for the topic, but the following java 
> source reliably fails to produce rows. I have the suspicion that I am missing 
> some simple thing in my setup, but I have been unable to find a way out. I am 
> using the current docker and using docker network commands to connect the 
> processes in my cluster. The properties are:
> bootstrap.servers: kafka01:9092,kafka02:9092,kafka03:9092
> group.id: dhcp1
> topic: dhcp
> enable.auto.commit: false
> auto.commit.interval.ms: 1000
> session.timeout.ms 30000
> key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
> value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
> the kafka consumer follows. One thing that I find curious is that, although I 
> seem to successfully make the call to seekToBeginning(), when I print offsets 
> on failure, I get large offsets for all partitions although I had expected 
> them to be 0 or at least some small number.
> Here is the code:
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.ConsumerRecord;
> import org.apache.kafka.clients.consumer.ConsumerRecords;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.errors.TimeoutException;
> import org.apache.kafka.common.protocol.types.SchemaException;
> import org.apache.kafka.common.KafkaException;
> import org.apache.kafka.common.Node;
> import org.apache.kafka.common.PartitionInfo;
> import org.apache.kafka.common.TopicPartition;
> import java.io.FileInputStream;
> import java.io.FileNotFoundException;
> import java.io.IOException;
> import java.lang.Integer;
> import java.lang.System;
> import java.lang.Thread;
> import java.lang.InterruptedException;
> import java.util.Arrays;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> public class KConsumer {
>     private Properties prop;
>     private String topic;
>     private Integer polln;
>     private KafkaConsumer<String, String> consumer;
>     private String[] pna = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>             ConsumerConfig.GROUP_ID_CONFIG,
>             ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>             ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
>             ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
>             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>             ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG};
>     public KConsumer(String pf) throws FileNotFoundException,
>                     IOException {
>         this.setProperties(pf);
>         this.newClient();
>     }
>     public void setProperties(String p) throws FileNotFoundException,
>                     IOException {
>             this.prop = new Properties();
>             this.prop.load(new FileInputStream(p));
>             this.topic = this.prop.getProperty("topic");
>             this.polln = new Integer(this.prop.getProperty("polln"));
>     }
>     public void setTopic(String t) {
>         this.topic = t;
>     }
>     public String getTopic() {
>         return this.topic;
>     }
>     public void newClient() {
>         System.err.println("creating consumer");
>         Properties kp = new Properties();
>         for(String p : pna) {
>             String v = this.prop.getProperty(p);
>             if(v != null) {
>                 kp.put(p, v);
>             }
>         }
>         //this.consumer = new KafkaConsumer<>(this.prop);
>         this.consumer = new KafkaConsumer<>(kp);
>         //this.consumer.subscribe(Collections.singletonList(this.topic));
>         System.err.println("subscribing to " + this.topic);
>         this.consumer.subscribe(Arrays.asList(this.topic));
>         //this.seekToBeginning();
>     }
>     public void close() {
>         this.consumer.close();
>         this.consumer = null;
>     }
>     public void seekToBeginning() {
>         if(this.topic == null) {
>             System.err.println("KConsumer: topic not set");
>             System.exit(1);
>         }
>         System.err.println("setting partition offset to beginning");
>         java.util.Set<TopicPartition> tps = this.consumer.assignment();
>         this.consumer.seekToBeginning(tps);
>     }
>     public ConsumerRecords<String,String> nextBatch()
>        throws KafkaException {
>         while(true) {
>             try {
>                 System.err.printf("polling...");
>                 ConsumerRecords<String,String> records =
>                     this.consumer.poll(this.polln);
>     
>             System.err.println("returned");
>             System.err.printf("record count %d\n", records.count());
>             return records;
>             } catch(SchemaException e) {
>                 System.err.println("nextBatch: " + e);
>             } catch(KafkaException e) {
>                 System.err.println("nextBatch: " + e);
>                 throw e;
>             } catch(Exception e) {
>                 System.err.println("nextBatch: " + e);
>                 this.consumer.close();
>                 System.exit(1);
>             }
>             try {
>                 System.err.println("sleeping");
>                 Thread.sleep(2000);
>             } catch(InterruptedException e) {
>                 System.err.println(e);
>                 System.exit(0);
>             }
>         }
>     }
>     public void printBatch(ConsumerRecords<String,String> records) {
>         System.err.println("printing...");
>         Iterable<ConsumerRecord<String,String>> ri =
>                records.records(this.topic);
>         for (ConsumerRecord<String, String> record : ri) {
>             System.out.printf("offset = %d, key = %s, value = %s%n",
>                               record.offset(), record.key(),
>                               record.value());
>         }
>     }
>     public void doProcess() {
>         Integer n = 0;
>         Integer f = 0;
>         long total = 0;
>         try {
>             while(true) {
>                 ConsumerRecords<String,String> r = this.nextBatch();
>                 long count = r.count();
>                 if(r.count() > 0) {
>                     total += count;
>                     this.printBatch(r);
>                     n = n + 1;
>                 } else {
>                     f = f + 1;
>                 }
>                 if(f > 10) {
>                     System.err.printf("total %d\n", total);
>                     this.printMisc();
>                     break;
>                 }
>             }
>         } finally {
>             this.consumer.close();
>         }
>     }
>     public void printPosition(int pid) {
>         try {
>             TopicPartition tp = new TopicPartition(this.topic, pid);
>             long pos = this.consumer.position(tp);
>             System.err.printf("    offset: %d\n", pos);
>         } catch(IllegalArgumentException e) {
>             System.err.printf("printPosition: %d %s\n", pid, e);
>         }
>     }
>     public void printMisc() {
>         Map<String,List<PartitionInfo>> topicMap;
>         List<PartitionInfo> partitionList;
>         System.err.println("in printMisc");
>         try {
>             topicMap = this.consumer.listTopics();
>             for(String key: topicMap.keySet()) {
>                 if(key.compareTo(this.topic) != 0) continue;
>                 System.err.printf("topic: %s\n", key);
>                 List<PartitionInfo> pl = topicMap.get(key);
>                 for(PartitionInfo pinf: pl) {
>                     System.err.printf("partition %d\n", pinf.partition());
>                     System.err.printf("    leader %s\n",
>                                     pinf.leader().host());
>                     this.printPosition(pinf.partition());
>                     System.err.printf("    replicas:\n");
>                     for(Node r: pinf.replicas()) {
>                         System.err.printf("    %s %s\n", r.id(), r.host());
>                     }
>                     System.err.printf("    inSyncReplicas:\n");
>                     for(Node r: pinf.inSyncReplicas()) {
>                         System.err.printf("    %s %s\n", r.id(), r.host());
>                     }
>                 }
>             }
>         } catch (TimeoutException e) {
>             System.err.printf("printMisc: %s\n", e);
>             //System.exit(1);
>         }
>     }
>     public static void main(String[] args) throws FileNotFoundException,
>                     IOException, InterruptedException {
>         if(args.length == 1) {
>             Thread.sleep(2000); // docker network connect
>             KConsumer kc = new KConsumer(args[0]);
>             //kc.printMisc();
>             kc.doProcess();
>         } else {
>             System.err.println("Usage KConsumer propfile");
>             System.exit(1);
>         }
>     }
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to