Hi all, I am using kafka 0.10.2 version and I have a kafka cluster with topics configured for 5 partitions. I am trying to use with Kafka consumers assigned manually to multiple partitions. I see that in the consumer poll loop , i am getting records from multiple partitions, but the number of records keeps varying everytime i run the program. One run gives 400 records. but another run gives 200 records. Kafka retention times are configured for more than a day. I dont know if i am using the APIs wrongly. The code that i use is below:
import java.util.*; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; public class ConsumerGroup { public static OffsetAndTimestamp fetchOffsetByTime( KafkaConsumer<Long, String> consumer , TopicPartition partition , long startTime){ Map<TopicPartition, Long> query = new HashMap<>(); query.put( partition, startTime); final Map<TopicPartition, OffsetAndTimestamp> offsetResult = consumer.offsetsForTimes(query); if( offsetResult == null || offsetResult.isEmpty() ) { System.out.println(" No Offset to Fetch "); System.out.println(" Offset Size "+offsetResult.size()); return null; } final OffsetAndTimestamp offsetTimestamp = offsetResult.get(partition); if(offsetTimestamp == null ){ System.out.println("No Offset Found for partition : "+partition.partition()); } return offsetTimestamp; } public static KafkaConsumer<Long, String> assignOffsetToConsumer( KafkaConsumer<Long, String> consumer, String topic , long startTime ){ final List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); System.out.println("Number of Partitions : "+partitionInfoList.size()); final List<TopicPartition> topicPartitions = new ArrayList<>(); for (PartitionInfo pInfo : partitionInfoList) { TopicPartition partition = new TopicPartition(topic, pInfo.partition()); topicPartitions.add(partition); } consumer.assign(topicPartitions); for(TopicPartition partition : topicPartitions ){ OffsetAndTimestamp offSetTs = fetchOffsetByTime(consumer, partition, startTime); if( offSetTs == null ){ System.out.println("No Offset Found for partition : " + partition.partition()); consumer.seekToEnd(Arrays.asList(partition)); }else { System.out.println(" Offset Found for partition : " +offSetTs.offset()+" " +partition.partition()); System.out.println("FETCH offset success"+ " Offset " + offSetTs.offset() + " offSetTs " + offSetTs); consumer.seek(partition, offSetTs.offset()); } } return consumer; } public static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer <topic> <groupname> <startTimestamp>"); return; } String topic = args[0].toString(); String group = args[1].toString(); long start_time_Stamp = Long.parseLong( args[2].toString()); Properties props = new Properties(); props.put("bootstrap.servers", "198.18.134.4:9092,198.18.134.6:9092, 198.18.134.7:9092,198.18.134.8:9092,198.18.134.12:9092 "); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props); assignOffsetToConsumer(consumer, topic, start_time_Stamp); System.out.println("Subscribed to topic " + topic); int i = 0; int arr[] = {0,0,0,0,0}; while (true) { ConsumerRecords<Long, String> records = consumer.poll(1000); int count= 0; for (ConsumerRecord<Long, String> record : records) { count++; if(arr[record.partition()] == 0){ arr[record.partition()] =1; } System.out.println("record=>"+" offset=" +record.offset() + " timestamp="+record.timestamp() + " :"+record); System.out.println("recordcount = "+count+" bitmap"+Arrays.toString(arr)); } } } } What am i missing here? thanks, mugunthan