Hi Ping, 1) Yes the records are already existent. 2) Yeah the Kafka broker also upgraded to 3.6 version. That’s why we are upgrading the client code also. 3) Do I need to compare any settings between old broker and new broker?
Please help. Thanks Giri On Mon, 25 Nov 2024 at 6:45 AM, Chia-Ping Tsai <chia7...@gmail.com> wrote: > hi > > 1) The records you want to read are already existent, right? > > 2) Are all your code changes solely focused on upgrading the client code > from version 0.8 to 3.6? Or does the broker get upgraded as well? > > > Thanks, > Chia-Ping > > > giri mungi <girimung...@gmail.com> 於 2024年11月25日 凌晨1:27 寫道: > > > > Hi Ping, > > > > My bad,commitSync is not required.we can ignore that. > > > > Iam calculating diff as below : > > > > long stime = Calendar.getInstance().getTimeInMillis(); > > ConsumerRecords <String,String> records = > > consumer.poll(Duration.ofMillis(2000)); > > long etime = Calendar.getInstance().getTimeInMillis(); > > log.info("Poll Records Count :{} Diff > :{}",records.count(),(etime-stime)); > > > > Thanks, > > Giri > > > >> On Sun, Nov 24, 2024 at 10:48 PM Chia-Ping Tsai <chia7...@gmail.com> > wrote: > >> > >> hi Giri > >> > >> 1. Why do you call `commitSync`? it seems your application does not use > the > >> consumer group as your use case is to random read, right? > >> 2. how do you calculate "diff"? > >> > >> giri mungi <girimung...@gmail.com> 於 2024年11月25日 週一 上午12:50寫道: > >> > >>> Hi Ping, > >>> > >>> Please find the details below: > >>> > >>> 1) Kafka broker version is 3.6.1 > >>> > >>> 2) Logic Explanation: > >>> > >>> Polls messages from Kafka using consumer.poll(Duration.ofMillis(2000)). > >>> > >>> *Exit Conditions:* The loop exits when Message Limit > 1000 is reached: > >>> Then the end flag is true and the loop will exit. > >>> > >>> > >>> boolean end = false; > >>> consumer.seek(topicPartition, readOffset); > >>> do { > >>> JSONObject obj = null; > >>> long stime = Calendar.getInstance().getTimeInMillis(); > >>> ConsumerRecords < String, String > records = > >>> consumer.poll(Duration.ofMillis(2000)); > >>> for (ConsumerRecord< String, String > consumerRecord: records) { > >>> String message = consumerRecord.value(); > >>> obj = new JSONObject(message); > >>> msglist.add(obj); > >>> } > >>> } > >>> if (msglist.size() >=1000) { > >>> end = true; > >>> consumer.commitSync(); > >>> } > >>> } while (!end); > >>> > >>> > >>> Thanks, > >>> Giri > >>> > >>> On Sun, Nov 24, 2024 at 9:23 PM Chia-Ping Tsai <chia7...@gmail.com> > >> wrote: > >>> > >>>> hi > >>>> > >>>> 1) could you share the broker version to us? > >>>> 2) could you explain how the sample code works? what is the "end"? > >>>> > >>>> ``` > >>>> do { > >>>> ConsumerRecords < String, String > records = > >>>> consumer.poll(Duration.ofMillis(1500)); > >>>> } while (!end) > >>>> ``` > >>>> > >>>> thanks, > >>>> Chia-Ping > >>>> > >>>> > >>>> giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午11:15寫道: > >>>> > >>>>> Hi, > >>>>> > >>>>> All of them are from same consumer in each poll. > >>>>> > >>>>> Before poll we set offset to user input offset and try to consume > >> next > >>>> 1000 > >>>>> messages. > >>>>> > >>>>> Thanks > >>>>> Giridhar. > >>>>> > >>>>> On Sun, 24 Nov 2024 at 8:40 PM, Chia-Ping Tsai <chia7...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> hi > >>>>>> > >>>>>>> Poll Records Count :0 diff :2004 > >>>>>> Poll Records Count :500 diff :943 > >>>>>> > >>>>>> Are them from the same consumer in each poll? Or they are based on > >>>>>> different "offset" and separate consumer's poll? > >>>>>> > >>>>>> thanks, > >>>>>> Chia-Ping > >>>>>> > >>>>>> > >>>>>> giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午8:51寫道: > >>>>>> > >>>>>>> Do i need to check any settings in the kafka server level? > >>>>>>> > >>>>>>> On Sun, Nov 24, 2024 at 6:19 PM giri mungi < > >> girimung...@gmail.com> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hi I have set the below properties as below: > >>>>>>>> > >>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000"); > >>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); > >>>>>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500"); > >>>>>>>> props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, > >>>>> "1750000"); > >>>>>>>> > >>>>>>>> Poll Records Count :0 diff :2004 > >>>>>>>> Poll Records Count :500 diff :943 > >>>>>>>> Poll Records Count :0 diff :2000 > >>>>>>>> Poll Records Count :44 diff :3 > >>>>>>>> Poll Records Count :500 diff :205 > >>>>>>>> Poll Records Count :488 diff :1978 > >>>>>>>> > >>>>>>>> > >>>>>>>> if i set max records as 500 or 1000 then sometimes i am getting > >>>> count > >>>>>> as > >>>>>>>> "0" and taking max wait ms time which is impacting the overall > >>>>>> response > >>>>>>>> time.. > >>>>>>>> > >>>>>>>> Even though I have mentioned *fetch_min_bytes & > >> max_poll_records > >>>> iam > >>>>>>>> getting "0" records which is impacting the overall response > >>>> time.* > >>>>>>>> > >>>>>>>> How to avoid this please. > >>>>>>>> > >>>>>>>> please suggest this. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Giridar > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai < > >>>> chia7...@apache.org> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> hi Giridar > >>>>>>>>> > >>>>>>>>> It seems your use case involves random reads, and you expect > >> the > >>>>>>> consumer > >>>>>>>>> to return 1000 records from server at once. Therefore, you > >> could > >>>>>>> increase > >>>>>>>>> the wait time (fetch.max.wait.ms) and fetch size > >>>> (fetch.min.bytes) > >>>>> to > >>>>>>>>> receive a larger response with as many records as possible. > >>>>>>>>> > >>>>>>>>> A suitable value for fetch.min.bytes might be 'average size of > >>>>>> records * > >>>>>>>>> 1000'. You may also need to increase > >> max.partition.fetch.bytes. > >>>> For > >>>>>>>>> fetch.max.wait.ms, a value of 1500 ms might match your > >>>>> expectations. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Chia-Ping > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 2024/11/24 10:55:23 giri mungi wrote: > >>>>>>>>>> Hi Yang, > >>>>>>>>>> > >>>>>>>>>> *This is the old code which is perfectly doing fine and > >>>> returning > >>>>>> less > >>>>>>>>> than > >>>>>>>>>> 3 seconds for all 1000 records.* > >>>>>>>>>> > >>>>>>>>>> do { > >>>>>>>>>> FetchRequest req = new > >>>> FetchRequestBuilder().clientId(clientName) > >>>>>>>>>> .addFetch(a_topic, a_partition, readOffset, fetchSize) > >>>>>>>>>> .build(); > >>>>>>>>>> > >>>>>>>>>> FetchResponse fetchResponse = consumer.fetch(req); > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> JSONObject obj = null; > >>>>>>>>>> ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, > >>>>>>>>>> a_partition); > >>>>>>>>>> > >>>>>>>>>> for (MessageAndOffset messageAndOffset : set) { > >>>>>>>>>> String message = null; > >>>>>>>>>> if (messageAndOffset.offset() < readOffset) { > >>>>>>>>>> log.warn("Found an old offset: {} ,Expecting: {}", > >>>>>>>>>> messageAndOffset.offset(), readOffset); > >>>>>>>>>> continue; > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> ByteBuffer payload = messageAndOffset.message().payload(); > >>>>>>>>>> > >>>>>>>>>> byte[] bytes = new byte[payload.limit()]; > >>>>>>>>>> payload.get(bytes); > >>>>>>>>>> try { > >>>>>>>>>> message = new String(bytes, "UTF-8"); > >>>>>>>>>> } catch (UnsupportedEncodingException ue) { > >>>>>>>>>> log.warn(ue.getMessage(), ue); > >>>>>>>>>> message = new String(bytes); > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> obj = new JSONObject(message); > >>>>>>>>>> msglist.add(new JSONObject(message)); > >>>>>>>>>> } > >>>>>>>>>> readOffset = messageAndOffset.nextOffset(); > >>>>>>>>>> > >>>>>>>>>> if (msglist.size() >= Math.round(limit > >>>>>>>>>> / inputReq.getApplicationArea().getReqInfo().size()) > >>>>>>>>>> || (endTime - startTime) >= waitTime) { > >>>>>>>>>> log.info( > >>>>>>>>>> "Wait condition has been met... exiting the fetch loop. > >>>>> recordCount > >>>>>> - > >>>>>>>>> {}, > >>>>>>>>>> time exhausted - {} ms.", > >>>>>>>>>> msglist.size(), (endTime - startTime)); > >>>>>>>>>> end = true; > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> } while (!end); > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> *The new code is taking 8 seconds.. which is more than > >> double > >>>> the > >>>>>>> time* > >>>>>>>>>> > >>>>>>>>>> TopicPartition topicPartition = new TopicPartition("Test", > >> 0); > >>>>>>>>>> > >>>>> consumer.seekToBeginning(Collections.singletonList(topicPartition)); > >>>>>>>>>> long kafkaEarliestOffset = > >> consumer.position(topicPartition); > >>>>>>>>>> try (KafkaConsumer < String, String > consumer = > >>>>>>>>>> KafkaConsumerFactory.createConsumer(clientName, fetchSize)) > >> { > >>>>>>>>>> consumer.partitionsFor(topicName); > >>>>>>>>>> > >>> consumer.assign(Collections.singletonList(topicPartition)); > >>>>>>>>>> consumer.seek(topicPartition, readOffset); > >>>>>>>>>> do { > >>>>>>>>>> ConsumerRecords < String, String > records = > >>>>>>>>>> consumer.poll(Duration.ofMillis(1500)); > >>>>>>>>>> } while (!end) > >>>>>>>>>> > >>>>>>>>>> public static KafkaConsumer<String,String> > >>> createConsumer(String > >>>>>>>>>> clientName,int fetchSize) { > >>>>>>>>>> Properties props = new Properties(); > >>>>>>>>>> String kafkaBrokerStr = > >>>>>>>>>> Config.getConsumerPropValue("kafkabrokerslist"); > >>>>>>>>>> String groupId = Config.getConsumerPropValue(" > >>>>> group.id > >>>>>> "); > >>>>>>>>>> > >>>>>>>>> > >> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > >>>>>>>>>> StringDeserializer.class.getName()); > >>>>>>>>>> > >>>>>>>>>> > >>>> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > >>>>>>>>>> StringDeserializer.class.getName()); > >>>>>>>>>> > >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > >>>> ""); > >>>>>>>>>> > >>>>>>> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > >>>>>>>>>> "false"); > >>>>>>>>>> > >>>>>> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > >>>>>>>>>> "earliest"); > >>>>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > >>>>>> "1024"); > >>>>>>>>>> > >>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > >>>>>>>>>> > >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > >>>>>> "500"); > >>>>>>>>>> return new KafkaConsumer<String,String>(props); > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> How to improve the performance on this. > >>>>>>>>>> > >>>>>>>>>> On Sun, Nov 24, 2024 at 4:11 PM giri mungi < > >>>> girimung...@gmail.com > >>>>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Yang, > >>>>>>>>>>> > >>>>>>>>>>> Can i get the records from kafka as bytes or compression > >>> form > >>>> so > >>>>>>> that > >>>>>>>>> i > >>>>>>>>>>> will take less time from kafka. > >>>>>>>>>>> > >>>>>>>>>>> I can build messages from those bytes.Is that possible? > >>>>>>>>>>> > >>>>>>>>>>> Can you please give suggestions on this. > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> Giridar > >>>>>>>>>>> > >>>>>>>>>>> On Sun, Nov 24, 2024 at 3:50 PM giri mungi < > >>>>> girimung...@gmail.com > >>>>>>> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Yang, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for your reply. > >>>>>>>>>>>> > >>>>>>>>>>>> Now what should I do to improve my performance?Because > >> the > >>>> old > >>>>>>> kafka > >>>>>>>>> code > >>>>>>>>>>>> was good in performance > >>>>>>>>>>>> > >>>>>>>>>>>> These are the properties: > >>>>>>>>>>>> > >>>>>>>>>>>> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > >>>>>>>>>>>> > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > >>>>>>>>>>>> "false"); > >>>>>>>>>>>> > >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > >>>>>>>>>>>> "earliest"); > >>>>>>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > >> "1024"); > >>>>>>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > >>>>>>>>>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > >> "500"); > >>>>>>>>>>>> > >>>>>>>>>>>> How to decrease this time ,plz suggest . > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Giridar > >>>>>>>>>>>> > >>>>>>>>>>>> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang < > >>>> yangp...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Giridar, > >>>>>>>>>>>>> > >>>>>>>>>>>>>> *Code explanation:Fetching records is taking time for > >>> the > >>>>>> first > >>>>>>>>> poll.* > >>>>>>>>>>>>>> Poll Records Count: 500 diff: 1284 > >>>>>>>>>>>>>> Poll Records Count: 500 diff: 3 > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> For the first 500 records it took 1284 ms and next 500 > >>>>> records > >>>>>>> it > >>>>>>>>> took > >>>>>>>>>>>>> 4 ms > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *Why this much difference? I would like to improve the > >>>>>>>>> performance of > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> first poll time?* > >>>>>>>>>>>>> > >>>>>>>>>>>>> IIRC, a consumer has FetchBuffer (cache). > >>>>>>>>>>>>> If a FetchResponse can return records morn than > >>>>>>> `max.poll.records`, > >>>>>>>>> the > >>>>>>>>>>>>> extra records will be stored in FetchBuffer. > >>>>>>>>>>>>> That is why the second poll can get data in short time. > >>>>>>>>>>>>> IMO, we don’t need to balance each poll time, because > >> that > >>>>> means > >>>>>>> we > >>>>>>>>>>>>> don’t want local cache. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> For example if i give "0" as input offset and it is > >>> taking > >>>>>> time > >>>>>>> as > >>>>>>>>>>>>> below (6 > >>>>>>>>>>>>>> seconds) and not getting 500 records also,it is > >> getting > >>>> only > >>>>>> 200 > >>>>>>>>>>>>> records > >>>>>>>>>>>>>> per poll and taking lot of time...why this is > >> happening > >>>> and > >>>>>> how > >>>>>>> to > >>>>>>>>>>>>> avoid > >>>>>>>>>>>>>> this. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1227 ms > >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1181 ms > >>>>>>>>>>>>>> Poll Records Count :296 Time taken:1234 ms > >>>>>>>>>>>>>> Poll Records Count :292 Time taken:1133 ms > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *If I give an offset as 110999 and it is getting some > >>> fast > >>>>> and > >>>>>>>>> records > >>>>>>>>>>>>>> getting as 500 each..Why this difference please.* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Poll Records Count :500 Time taken:1284 ms > >>>>>>>>>>>>>> Poll Records Count :500 Time taken:3 ms > >>>>>>>>>>>>> > >>>>>>>>>>>>> IIUC, a FetchRequest has a limitation from > >>>> `fetch.max.bytes`. > >>>>>>>>>>>>> If the record size from offset “0” is bigger than from > >>>> offset > >>>>>>>>> “110999”, > >>>>>>>>>>>>> then a FetchResponse returns less records. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Please correct me if I misunderstood anything. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> PoAn > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On Nov 24, 2024, at 2:09 PM, giri mungi < > >>>>>> girimung...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Team, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Good day to you. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Iam Giridhar.I need your suggestions in kafka > >>>>>>>>>>>>>> performance improvement please. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *Scenario is: The user will give the offset as input > >> and > >>>>> based > >>>>>>> on > >>>>>>>>> the > >>>>>>>>>>>>>> offset we need to give the next 1000 messages from > >> kafka > >>>>> topic > >>>>>>>>> and next > >>>>>>>>>>>>>> offset.The kafka topic contains only one partition.* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> We are trying to migrate from old kafka to new > >> kafka.In > >>>> the > >>>>>> old > >>>>>>>>> kafka > >>>>>>>>>>>>> we > >>>>>>>>>>>>>> were using code like: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *old code(kafka clients 0.8 .1):* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> FetchRequest req = new > >>>>>>>>>>>>>> > >>>> FetchRequestBuilder().clientId(clientName).addFetch(a_topic, > >>>>>>>>>>>>>> a_partition, readOffset, fetchSize).build(); > >>>>>>>>>>>>>> FetchResponse fetchResponse = consumer.fetch(req); > >>>>>>>>>>>>>> ByteBufferMessageSet set = > >>>> fetchResponse.messageSet(a_topic, > >>>>>>>>>>>>> a_partition); > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> This code is super fast and same we are trying to > >>> achieve > >>>>>> using > >>>>>>>>>>>>>> KafkaConsumer API and getting slowness > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *New kafkaconsumer code is using(kafka clients 3.6 > >> .1)* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> TopicPartition topicPartition = new > >>> TopicPartition("Test", > >>>>> 0); > >>>>>>>>>>>>>> > >>>>>>>>> > >>>> consumer.seekToBeginning(Collections.singletonList(topicPartition)); > >>>>>>>>>>>>>> long kafkaEarliestOffset = > >>>>> consumer.position(topicPartition); > >>>>>>>>>>>>>> try (KafkaConsumer < String, String > consumer = > >>>>>>>>>>>>>> KafkaConsumerFactory.createConsumer(clientName, > >>>> fetchSize)) > >>>>> { > >>>>>>>>>>>>>> > >>>>> consumer.assign(Collections.singletonList(topicPartition)); > >>>>>>>>>>>>>> consumer.seek(topicPartition, readOffset); > >>>>>>>>>>>>>> do { > >>>>>>>>>>>>>> ConsumerRecords < String, String > records = > >>>>>>>>>>>>>> consumer.poll(Duration.ofMillis(1500)); > >>>>>>>>>>>>>> } while (!end) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> public static KafkaConsumer<String,String> > >>>>>> createConsumer(String > >>>>>>>>>>>>>> clientName,int fetchSize) { > >>>>>>>>>>>>>> Properties props = new Properties(); > >>>>>>>>>>>>>> String kafkaBrokerStr = > >>>>>>>>>>>>>> Config.getConsumerPropValue("kafkabrokerlist"); > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > >>>>>>>>>>>>>> StringDeserializer.class.getName()); > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > >>>>>>>>>>>>>> StringDeserializer.class.getName()); > >>>>>>>>>>>>>> > >>>> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > >>>>>>> ""); > >>>>>>>>>>>>>> > >>>>>>>>> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > >>>>>>>>>>>>>> "false"); > >>>>>>>>>>>>>> > >>>>>>>>> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > >>>>>>>>>>>>>> "earliest"); > >>>>>>>>>>>>>> > >>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > >>>>>>>>> "1024"); > >>>>>>>>>>>>>> > >>>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > >>>>>>>>>>>>>> > >>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > >>>>>>>>> "500"); > >>>>>>>>>>>>>> return new > >>> KafkaConsumer<String,String>(props); > >>>>>>>>>>>>>> } > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *Code explanation:Fetching records is taking time for > >>> the > >>>>>> first > >>>>>>>>> poll.* > >>>>>>>>>>>>>> Poll Records Count: 500 diff: 1284 > >>>>>>>>>>>>>> Poll Records Count: 500 diff: 3 > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> For the first 500 records it took 1284 ms and next 500 > >>>>> records > >>>>>>> it > >>>>>>>>> took > >>>>>>>>>>>>> 4 ms > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *Why this much difference? I would like to improve the > >>>>>>>>> performance of > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> first poll time?* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1) How to fetch first 500 records in less time > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *I am also seeing one strange issue.My kafka topic > >> which > >>>> has > >>>>>> one > >>>>>>>>>>>>> partition > >>>>>>>>>>>>>> contains some 5 lakh records*.*The starting records > >> take > >>>>> more > >>>>>>>>> time to > >>>>>>>>>>>>> fetch > >>>>>>>>>>>>>> from kafka.* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> For example if i give "0" as input offset and it is > >>> taking > >>>>>> time > >>>>>>> as > >>>>>>>>>>>>> below (6 > >>>>>>>>>>>>>> seconds) and not getting 500 records also,it is > >> getting > >>>> only > >>>>>> 200 > >>>>>>>>>>>>> records > >>>>>>>>>>>>>> per poll and taking lot of time...why this is > >> happening > >>>> and > >>>>>> how > >>>>>>> to > >>>>>>>>>>>>> avoid > >>>>>>>>>>>>>> this. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1227 ms > >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1181 ms > >>>>>>>>>>>>>> Poll Records Count :296 Time taken:1234 ms > >>>>>>>>>>>>>> Poll Records Count :292 Time taken:1133 ms > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> *If I give an offset as 110999 and it is getting some > >>> fast > >>>>> and > >>>>>>>>> records > >>>>>>>>>>>>>> getting as 500 each..Why this difference please.* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Poll Records Count :500 Time taken:1284 ms > >>>>>>>>>>>>>> Poll Records Count :500 Time taken:3 ms > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Please give your suggestion on this. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> Giridar > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> >