[ https://issues.apache.org/jira/browse/KAFKA-3972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ewen Cheslack-Postava resolved KAFKA-3972. ------------------------------------------ Resolution: Invalid Assignee: Ewen Cheslack-Postava > kafka java consumer poll returns 0 records after seekToBeginning > ---------------------------------------------------------------- > > Key: KAFKA-3972 > URL: https://issues.apache.org/jira/browse/KAFKA-3972 > Project: Kafka > Issue Type: Task > Components: consumer > Affects Versions: 0.10.0.0 > Environment: docker image elasticsearch:latest, kafka scala version > 2.11, kafka version 0.10.0.0 > Reporter: don caldwell > Assignee: Ewen Cheslack-Postava > Labels: kafka, polling > > kafkacat successfully returns rows for the topic, but the following java > source reliably fails to produce rows. I have the suspicion that I am missing > some simple thing in my setup, but I have been unable to find a way out. I am > using the current docker and using docker network commands to connect the > processes in my cluster. The properties are: > bootstrap.servers: kafka01:9092,kafka02:9092,kafka03:9092 > group.id: dhcp1 > topic: dhcp > enable.auto.commit: false > auto.commit.interval.ms: 1000 > session.timeout.ms 30000 > key.deserializer: org.apache.kafka.common.serialization.StringDeserializer > value.deserializer: org.apache.kafka.common.serialization.StringDeserializer > the kafka consumer follows. One thing that I find curious is that, although I > seem to successfully make the call to seekToBeginning(), when I print offsets > on failure, I get large offsets for all partitions although I had expected > them to be 0 or at least some small number. > Here is the code: > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.consumer.ConsumerRecord; > import org.apache.kafka.clients.consumer.ConsumerRecords; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.errors.TimeoutException; > import org.apache.kafka.common.protocol.types.SchemaException; > import org.apache.kafka.common.KafkaException; > import org.apache.kafka.common.Node; > import org.apache.kafka.common.PartitionInfo; > import org.apache.kafka.common.TopicPartition; > import java.io.FileInputStream; > import java.io.FileNotFoundException; > import java.io.IOException; > import java.lang.Integer; > import java.lang.System; > import java.lang.Thread; > import java.lang.InterruptedException; > import java.util.Arrays; > import java.util.ArrayList; > import java.util.Collections; > import java.util.List; > import java.util.Map; > import java.util.Properties; > public class KConsumer { > private Properties prop; > private String topic; > private Integer polln; > private KafkaConsumer<String, String> consumer; > private String[] pna = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > ConsumerConfig.GROUP_ID_CONFIG, > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, > ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}; > public KConsumer(String pf) throws FileNotFoundException, > IOException { > this.setProperties(pf); > this.newClient(); > } > public void setProperties(String p) throws FileNotFoundException, > IOException { > this.prop = new Properties(); > this.prop.load(new FileInputStream(p)); > this.topic = this.prop.getProperty("topic"); > this.polln = new Integer(this.prop.getProperty("polln")); > } > public void setTopic(String t) { > this.topic = t; > } > public String getTopic() { > return this.topic; > } > public void newClient() { > System.err.println("creating consumer"); > Properties kp = new Properties(); > for(String p : pna) { > String v = this.prop.getProperty(p); > if(v != null) { > kp.put(p, v); > } > } > //this.consumer = new KafkaConsumer<>(this.prop); > this.consumer = new KafkaConsumer<>(kp); > //this.consumer.subscribe(Collections.singletonList(this.topic)); > System.err.println("subscribing to " + this.topic); > this.consumer.subscribe(Arrays.asList(this.topic)); > //this.seekToBeginning(); > } > public void close() { > this.consumer.close(); > this.consumer = null; > } > public void seekToBeginning() { > if(this.topic == null) { > System.err.println("KConsumer: topic not set"); > System.exit(1); > } > System.err.println("setting partition offset to beginning"); > java.util.Set<TopicPartition> tps = this.consumer.assignment(); > this.consumer.seekToBeginning(tps); > } > public ConsumerRecords<String,String> nextBatch() > throws KafkaException { > while(true) { > try { > System.err.printf("polling..."); > ConsumerRecords<String,String> records = > this.consumer.poll(this.polln); > > System.err.println("returned"); > System.err.printf("record count %d\n", records.count()); > return records; > } catch(SchemaException e) { > System.err.println("nextBatch: " + e); > } catch(KafkaException e) { > System.err.println("nextBatch: " + e); > throw e; > } catch(Exception e) { > System.err.println("nextBatch: " + e); > this.consumer.close(); > System.exit(1); > } > try { > System.err.println("sleeping"); > Thread.sleep(2000); > } catch(InterruptedException e) { > System.err.println(e); > System.exit(0); > } > } > } > public void printBatch(ConsumerRecords<String,String> records) { > System.err.println("printing..."); > Iterable<ConsumerRecord<String,String>> ri = > records.records(this.topic); > for (ConsumerRecord<String, String> record : ri) { > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), > record.value()); > } > } > public void doProcess() { > Integer n = 0; > Integer f = 0; > long total = 0; > try { > while(true) { > ConsumerRecords<String,String> r = this.nextBatch(); > long count = r.count(); > if(r.count() > 0) { > total += count; > this.printBatch(r); > n = n + 1; > } else { > f = f + 1; > } > if(f > 10) { > System.err.printf("total %d\n", total); > this.printMisc(); > break; > } > } > } finally { > this.consumer.close(); > } > } > public void printPosition(int pid) { > try { > TopicPartition tp = new TopicPartition(this.topic, pid); > long pos = this.consumer.position(tp); > System.err.printf(" offset: %d\n", pos); > } catch(IllegalArgumentException e) { > System.err.printf("printPosition: %d %s\n", pid, e); > } > } > public void printMisc() { > Map<String,List<PartitionInfo>> topicMap; > List<PartitionInfo> partitionList; > System.err.println("in printMisc"); > try { > topicMap = this.consumer.listTopics(); > for(String key: topicMap.keySet()) { > if(key.compareTo(this.topic) != 0) continue; > System.err.printf("topic: %s\n", key); > List<PartitionInfo> pl = topicMap.get(key); > for(PartitionInfo pinf: pl) { > System.err.printf("partition %d\n", pinf.partition()); > System.err.printf(" leader %s\n", > pinf.leader().host()); > this.printPosition(pinf.partition()); > System.err.printf(" replicas:\n"); > for(Node r: pinf.replicas()) { > System.err.printf(" %s %s\n", r.id(), r.host()); > } > System.err.printf(" inSyncReplicas:\n"); > for(Node r: pinf.inSyncReplicas()) { > System.err.printf(" %s %s\n", r.id(), r.host()); > } > } > } > } catch (TimeoutException e) { > System.err.printf("printMisc: %s\n", e); > //System.exit(1); > } > } > public static void main(String[] args) throws FileNotFoundException, > IOException, InterruptedException { > if(args.length == 1) { > Thread.sleep(2000); // docker network connect > KConsumer kc = new KConsumer(args[0]); > //kc.printMisc(); > kc.doProcess(); > } else { > System.err.println("Usage KConsumer propfile"); > System.exit(1); > } > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)