Hi Ping, My bad,commitSync is not required.we can ignore that.
Iam calculating diff as below : long stime = Calendar.getInstance().getTimeInMillis(); ConsumerRecords <String,String> records = consumer.poll(Duration.ofMillis(2000)); long etime = Calendar.getInstance().getTimeInMillis(); log.info("Poll Records Count :{} Diff :{}",records.count(),(etime-stime)); Thanks, Giri On Sun, Nov 24, 2024 at 10:48 PM Chia-Ping Tsai <chia7...@gmail.com> wrote: > hi Giri > > 1. Why do you call `commitSync`? it seems your application does not use the > consumer group as your use case is to random read, right? > 2. how do you calculate "diff"? > > giri mungi <girimung...@gmail.com> 於 2024年11月25日 週一 上午12:50寫道: > > > Hi Ping, > > > > Please find the details below: > > > > 1) Kafka broker version is 3.6.1 > > > > 2) Logic Explanation: > > > > Polls messages from Kafka using consumer.poll(Duration.ofMillis(2000)). > > > > *Exit Conditions:* The loop exits when Message Limit > 1000 is reached: > > Then the end flag is true and the loop will exit. > > > > > > boolean end = false; > > consumer.seek(topicPartition, readOffset); > > do { > > JSONObject obj = null; > > long stime = Calendar.getInstance().getTimeInMillis(); > > ConsumerRecords < String, String > records = > > consumer.poll(Duration.ofMillis(2000)); > > for (ConsumerRecord< String, String > consumerRecord: records) { > > String message = consumerRecord.value(); > > obj = new JSONObject(message); > > msglist.add(obj); > > } > > } > > if (msglist.size() >=1000) { > > end = true; > > consumer.commitSync(); > > } > > } while (!end); > > > > > > Thanks, > > Giri > > > > On Sun, Nov 24, 2024 at 9:23 PM Chia-Ping Tsai <chia7...@gmail.com> > wrote: > > > > > hi > > > > > > 1) could you share the broker version to us? > > > 2) could you explain how the sample code works? what is the "end"? > > > > > > ``` > > > do { > > > ConsumerRecords < String, String > records = > > > consumer.poll(Duration.ofMillis(1500)); > > > } while (!end) > > > ``` > > > > > > thanks, > > > Chia-Ping > > > > > > > > > giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午11:15寫道: > > > > > > > Hi, > > > > > > > > All of them are from same consumer in each poll. > > > > > > > > Before poll we set offset to user input offset and try to consume > next > > > 1000 > > > > messages. > > > > > > > > Thanks > > > > Giridhar. > > > > > > > > On Sun, 24 Nov 2024 at 8:40 PM, Chia-Ping Tsai <chia7...@gmail.com> > > > wrote: > > > > > > > > > hi > > > > > > > > > > > Poll Records Count :0 diff :2004 > > > > > Poll Records Count :500 diff :943 > > > > > > > > > > Are them from the same consumer in each poll? Or they are based on > > > > > different "offset" and separate consumer's poll? > > > > > > > > > > thanks, > > > > > Chia-Ping > > > > > > > > > > > > > > > giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午8:51寫道: > > > > > > > > > > > Do i need to check any settings in the kafka server level? > > > > > > > > > > > > On Sun, Nov 24, 2024 at 6:19 PM giri mungi < > girimung...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hi I have set the below properties as below: > > > > > > > > > > > > > > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000"); > > > > > > > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); > > > > > > > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500"); > > > > > > > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, > > > > "1750000"); > > > > > > > > > > > > > > Poll Records Count :0 diff :2004 > > > > > > > Poll Records Count :500 diff :943 > > > > > > > Poll Records Count :0 diff :2000 > > > > > > > Poll Records Count :44 diff :3 > > > > > > > Poll Records Count :500 diff :205 > > > > > > > Poll Records Count :488 diff :1978 > > > > > > > > > > > > > > > > > > > > > if i set max records as 500 or 1000 then sometimes i am getting > > > count > > > > > as > > > > > > > "0" and taking max wait ms time which is impacting the overall > > > > > response > > > > > > > time.. > > > > > > > > > > > > > > Even though I have mentioned *fetch_min_bytes & > max_poll_records > > > iam > > > > > > > getting "0" records which is impacting the overall response > > > time.* > > > > > > > > > > > > > > How to avoid this please. > > > > > > > > > > > > > > please suggest this. > > > > > > > > > > > > > > Thanks, > > > > > > > Giridar > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai < > > > chia7...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > >> hi Giridar > > > > > > >> > > > > > > >> It seems your use case involves random reads, and you expect > the > > > > > > consumer > > > > > > >> to return 1000 records from server at once. Therefore, you > could > > > > > > increase > > > > > > >> the wait time (fetch.max.wait.ms) and fetch size > > > (fetch.min.bytes) > > > > to > > > > > > >> receive a larger response with as many records as possible. > > > > > > >> > > > > > > >> A suitable value for fetch.min.bytes might be 'average size of > > > > > records * > > > > > > >> 1000'. You may also need to increase > max.partition.fetch.bytes. > > > For > > > > > > >> fetch.max.wait.ms, a value of 1500 ms might match your > > > > expectations. > > > > > > >> > > > > > > >> Best, > > > > > > >> Chia-Ping > > > > > > >> > > > > > > >> > > > > > > >> On 2024/11/24 10:55:23 giri mungi wrote: > > > > > > >> > Hi Yang, > > > > > > >> > > > > > > > >> > *This is the old code which is perfectly doing fine and > > > returning > > > > > less > > > > > > >> than > > > > > > >> > 3 seconds for all 1000 records.* > > > > > > >> > > > > > > > >> > do { > > > > > > >> > FetchRequest req = new > > > FetchRequestBuilder().clientId(clientName) > > > > > > >> > .addFetch(a_topic, a_partition, readOffset, fetchSize) > > > > > > >> > .build(); > > > > > > >> > > > > > > > >> > FetchResponse fetchResponse = consumer.fetch(req); > > > > > > >> > > > > > > > >> > > > > > > > >> > JSONObject obj = null; > > > > > > >> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, > > > > > > >> > a_partition); > > > > > > >> > > > > > > > >> > for (MessageAndOffset messageAndOffset : set) { > > > > > > >> > String message = null; > > > > > > >> > if (messageAndOffset.offset() < readOffset) { > > > > > > >> > log.warn("Found an old offset: {} ,Expecting: {}", > > > > > > >> > messageAndOffset.offset(), readOffset); > > > > > > >> > continue; > > > > > > >> > } > > > > > > >> > > > > > > > >> > ByteBuffer payload = messageAndOffset.message().payload(); > > > > > > >> > > > > > > > >> > byte[] bytes = new byte[payload.limit()]; > > > > > > >> > payload.get(bytes); > > > > > > >> > try { > > > > > > >> > message = new String(bytes, "UTF-8"); > > > > > > >> > } catch (UnsupportedEncodingException ue) { > > > > > > >> > log.warn(ue.getMessage(), ue); > > > > > > >> > message = new String(bytes); > > > > > > >> > } > > > > > > >> > > > > > > > >> > obj = new JSONObject(message); > > > > > > >> > msglist.add(new JSONObject(message)); > > > > > > >> > } > > > > > > >> > readOffset = messageAndOffset.nextOffset(); > > > > > > >> > > > > > > > >> > if (msglist.size() >= Math.round(limit > > > > > > >> > / inputReq.getApplicationArea().getReqInfo().size()) > > > > > > >> > || (endTime - startTime) >= waitTime) { > > > > > > >> > log.info( > > > > > > >> > "Wait condition has been met... exiting the fetch loop. > > > > recordCount > > > > > - > > > > > > >> {}, > > > > > > >> > time exhausted - {} ms.", > > > > > > >> > msglist.size(), (endTime - startTime)); > > > > > > >> > end = true; > > > > > > >> > } > > > > > > >> > > > > > > > >> > } while (!end); > > > > > > >> > > > > > > > >> > > > > > > > >> > *The new code is taking 8 seconds.. which is more than > double > > > the > > > > > > time* > > > > > > >> > > > > > > > >> > TopicPartition topicPartition = new TopicPartition("Test", > 0); > > > > > > >> > > > > > consumer.seekToBeginning(Collections.singletonList(topicPartition)); > > > > > > >> > long kafkaEarliestOffset = > consumer.position(topicPartition); > > > > > > >> > try (KafkaConsumer < String, String > consumer = > > > > > > >> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) > { > > > > > > >> > consumer.partitionsFor(topicName); > > > > > > >> > > > consumer.assign(Collections.singletonList(topicPartition)); > > > > > > >> > consumer.seek(topicPartition, readOffset); > > > > > > >> > do { > > > > > > >> > ConsumerRecords < String, String > records = > > > > > > >> > consumer.poll(Duration.ofMillis(1500)); > > > > > > >> > } while (!end) > > > > > > >> > > > > > > > >> > public static KafkaConsumer<String,String> > > createConsumer(String > > > > > > >> > clientName,int fetchSize) { > > > > > > >> > Properties props = new Properties(); > > > > > > >> > String kafkaBrokerStr = > > > > > > >> > Config.getConsumerPropValue("kafkabrokerslist"); > > > > > > >> > String groupId = Config.getConsumerPropValue(" > > > > group.id > > > > > "); > > > > > > >> > > > > > > > >> > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > > > > > >> > StringDeserializer.class.getName()); > > > > > > >> > > > > > > > >> > > > > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > > > > > >> > StringDeserializer.class.getName()); > > > > > > >> > > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > > > ""); > > > > > > >> > > > > > > > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > > > > > >> > "false"); > > > > > > >> > > > > > > props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > > > > > >> > "earliest"); > > > > > > >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > > > > > "1024"); > > > > > > >> > > > > > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > > > > > > >> > > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > > > > > "500"); > > > > > > >> > return new KafkaConsumer<String,String>(props); > > > > > > >> > } > > > > > > >> > > > > > > > >> > How to improve the performance on this. > > > > > > >> > > > > > > > >> > On Sun, Nov 24, 2024 at 4:11 PM giri mungi < > > > girimung...@gmail.com > > > > > > > > > > > >> wrote: > > > > > > >> > > > > > > > >> > > Hi Yang, > > > > > > >> > > > > > > > > >> > > Can i get the records from kafka as bytes or compression > > form > > > so > > > > > > that > > > > > > >> i > > > > > > >> > > will take less time from kafka. > > > > > > >> > > > > > > > > >> > > I can build messages from those bytes.Is that possible? > > > > > > >> > > > > > > > > >> > > Can you please give suggestions on this. > > > > > > >> > > > > > > > > >> > > Thanks, > > > > > > >> > > Giridar > > > > > > >> > > > > > > > > >> > > On Sun, Nov 24, 2024 at 3:50 PM giri mungi < > > > > girimung...@gmail.com > > > > > > > > > > > > >> wrote: > > > > > > >> > > > > > > > > >> > >> Hi Yang, > > > > > > >> > >> > > > > > > >> > >> Thanks for your reply. > > > > > > >> > >> > > > > > > >> > >> Now what should I do to improve my performance?Because > the > > > old > > > > > > kafka > > > > > > >> code > > > > > > >> > >> was good in performance > > > > > > >> > >> > > > > > > >> > >> These are the properties: > > > > > > >> > >> > > > > > > >> > >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > > > > > > >> > >> > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > > > > > >> > >> "false"); > > > > > > >> > >> > props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > > > > > >> > >> "earliest"); > > > > > > >> > >> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > "1024"); > > > > > > >> > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > > > > > > >> > >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > "500"); > > > > > > >> > >> > > > > > > >> > >> How to decrease this time ,plz suggest . > > > > > > >> > >> > > > > > > >> > >> Thanks, > > > > > > >> > >> Giridar > > > > > > >> > >> > > > > > > >> > >> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang < > > > yangp...@gmail.com> > > > > > > >> wrote: > > > > > > >> > >> > > > > > > >> > >>> Hi Giridar, > > > > > > >> > >>> > > > > > > >> > >>> > *Code explanation:Fetching records is taking time for > > the > > > > > first > > > > > > >> poll.* > > > > > > >> > >>> > Poll Records Count: 500 diff: 1284 > > > > > > >> > >>> > Poll Records Count: 500 diff: 3 > > > > > > >> > >>> > > > > > > > >> > >>> > For the first 500 records it took 1284 ms and next 500 > > > > records > > > > > > it > > > > > > >> took > > > > > > >> > >>> 4 ms > > > > > > >> > >>> > > > > > > > >> > >>> > *Why this much difference? I would like to improve the > > > > > > >> performance of > > > > > > >> > >>> the > > > > > > >> > >>> > first poll time?* > > > > > > >> > >>> > > > > > > >> > >>> IIRC, a consumer has FetchBuffer (cache). > > > > > > >> > >>> If a FetchResponse can return records morn than > > > > > > `max.poll.records`, > > > > > > >> the > > > > > > >> > >>> extra records will be stored in FetchBuffer. > > > > > > >> > >>> That is why the second poll can get data in short time. > > > > > > >> > >>> IMO, we don’t need to balance each poll time, because > that > > > > means > > > > > > we > > > > > > >> > >>> don’t want local cache. > > > > > > >> > >>> > > > > > > >> > >>> > For example if i give "0" as input offset and it is > > taking > > > > > time > > > > > > as > > > > > > >> > >>> below (6 > > > > > > >> > >>> > seconds) and not getting 500 records also,it is > getting > > > only > > > > > 200 > > > > > > >> > >>> records > > > > > > >> > >>> > per poll and taking lot of time...why this is > happening > > > and > > > > > how > > > > > > to > > > > > > >> > >>> avoid > > > > > > >> > >>> > this. > > > > > > >> > >>> > > > > > > > >> > >>> > Poll Records Count :292 Time taken :1227 ms > > > > > > >> > >>> > Poll Records Count :292 Time taken :1181 ms > > > > > > >> > >>> > Poll Records Count :296 Time taken:1234 ms > > > > > > >> > >>> > Poll Records Count :292 Time taken:1133 ms > > > > > > >> > >>> > > > > > > > >> > >>> > *If I give an offset as 110999 and it is getting some > > fast > > > > and > > > > > > >> records > > > > > > >> > >>> > getting as 500 each..Why this difference please.* > > > > > > >> > >>> > > > > > > > >> > >>> > Poll Records Count :500 Time taken:1284 ms > > > > > > >> > >>> > Poll Records Count :500 Time taken:3 ms > > > > > > >> > >>> > > > > > > >> > >>> IIUC, a FetchRequest has a limitation from > > > `fetch.max.bytes`. > > > > > > >> > >>> If the record size from offset “0” is bigger than from > > > offset > > > > > > >> “110999”, > > > > > > >> > >>> then a FetchResponse returns less records. > > > > > > >> > >>> > > > > > > >> > >>> Please correct me if I misunderstood anything. > > > > > > >> > >>> > > > > > > >> > >>> Thanks, > > > > > > >> > >>> PoAn > > > > > > >> > >>> > > > > > > >> > >>> > On Nov 24, 2024, at 2:09 PM, giri mungi < > > > > > girimung...@gmail.com> > > > > > > >> wrote: > > > > > > >> > >>> > > > > > > > >> > >>> > Hi Team, > > > > > > >> > >>> > > > > > > > >> > >>> > Good day to you. > > > > > > >> > >>> > > > > > > > >> > >>> > Iam Giridhar.I need your suggestions in kafka > > > > > > >> > >>> > performance improvement please. > > > > > > >> > >>> > > > > > > > >> > >>> > *Scenario is: The user will give the offset as input > and > > > > based > > > > > > on > > > > > > >> the > > > > > > >> > >>> > offset we need to give the next 1000 messages from > kafka > > > > topic > > > > > > >> and next > > > > > > >> > >>> > offset.The kafka topic contains only one partition.* > > > > > > >> > >>> > > > > > > > >> > >>> > We are trying to migrate from old kafka to new > kafka.In > > > the > > > > > old > > > > > > >> kafka > > > > > > >> > >>> we > > > > > > >> > >>> > were using code like: > > > > > > >> > >>> > > > > > > > >> > >>> > *old code(kafka clients 0.8 .1):* > > > > > > >> > >>> > > > > > > > >> > >>> > FetchRequest req = new > > > > > > >> > >>> > > > > FetchRequestBuilder().clientId(clientName).addFetch(a_topic, > > > > > > >> > >>> > a_partition, readOffset, fetchSize).build(); > > > > > > >> > >>> > FetchResponse fetchResponse = consumer.fetch(req); > > > > > > >> > >>> > ByteBufferMessageSet set = > > > fetchResponse.messageSet(a_topic, > > > > > > >> > >>> a_partition); > > > > > > >> > >>> > > > > > > > >> > >>> > This code is super fast and same we are trying to > > achieve > > > > > using > > > > > > >> > >>> > KafkaConsumer API and getting slowness > > > > > > >> > >>> > > > > > > > >> > >>> > *New kafkaconsumer code is using(kafka clients 3.6 > .1)* > > > > > > >> > >>> > > > > > > > >> > >>> > TopicPartition topicPartition = new > > TopicPartition("Test", > > > > 0); > > > > > > >> > >>> > > > > > > > >> > > > consumer.seekToBeginning(Collections.singletonList(topicPartition)); > > > > > > >> > >>> > long kafkaEarliestOffset = > > > > consumer.position(topicPartition); > > > > > > >> > >>> > try (KafkaConsumer < String, String > consumer = > > > > > > >> > >>> > KafkaConsumerFactory.createConsumer(clientName, > > > fetchSize)) > > > > { > > > > > > >> > >>> > > > > > consumer.assign(Collections.singletonList(topicPartition)); > > > > > > >> > >>> > consumer.seek(topicPartition, readOffset); > > > > > > >> > >>> > do { > > > > > > >> > >>> > ConsumerRecords < String, String > records = > > > > > > >> > >>> > consumer.poll(Duration.ofMillis(1500)); > > > > > > >> > >>> > } while (!end) > > > > > > >> > >>> > > > > > > > >> > >>> > public static KafkaConsumer<String,String> > > > > > createConsumer(String > > > > > > >> > >>> > clientName,int fetchSize) { > > > > > > >> > >>> > Properties props = new Properties(); > > > > > > >> > >>> > String kafkaBrokerStr = > > > > > > >> > >>> > Config.getConsumerPropValue("kafkabrokerlist"); > > > > > > >> > >>> > > > > > > > >> > >>> > > > > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > > > > > >> > >>> > StringDeserializer.class.getName()); > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > > > > > >> > >>> > StringDeserializer.class.getName()); > > > > > > >> > >>> > > > > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > > > > > > ""); > > > > > > >> > >>> > > > > > > > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > > > > > >> > >>> > "false"); > > > > > > >> > >>> > > > > > > > >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > > > > > >> > >>> > "earliest"); > > > > > > >> > >>> > > > > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > > > > > > >> "1024"); > > > > > > >> > >>> > > > > > > > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > > > > > > >> > >>> > > > > > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > > > > > > >> "500"); > > > > > > >> > >>> > return new > > KafkaConsumer<String,String>(props); > > > > > > >> > >>> > } > > > > > > >> > >>> > > > > > > > >> > >>> > *Code explanation:Fetching records is taking time for > > the > > > > > first > > > > > > >> poll.* > > > > > > >> > >>> > Poll Records Count: 500 diff: 1284 > > > > > > >> > >>> > Poll Records Count: 500 diff: 3 > > > > > > >> > >>> > > > > > > > >> > >>> > For the first 500 records it took 1284 ms and next 500 > > > > records > > > > > > it > > > > > > >> took > > > > > > >> > >>> 4 ms > > > > > > >> > >>> > > > > > > > >> > >>> > *Why this much difference? I would like to improve the > > > > > > >> performance of > > > > > > >> > >>> the > > > > > > >> > >>> > first poll time?* > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > > >> > >>> > 1) How to fetch first 500 records in less time > > > > > > >> > >>> > > > > > > > >> > >>> > *I am also seeing one strange issue.My kafka topic > which > > > has > > > > > one > > > > > > >> > >>> partition > > > > > > >> > >>> > contains some 5 lakh records*.*The starting records > take > > > > more > > > > > > >> time to > > > > > > >> > >>> fetch > > > > > > >> > >>> > from kafka.* > > > > > > >> > >>> > > > > > > > >> > >>> > For example if i give "0" as input offset and it is > > taking > > > > > time > > > > > > as > > > > > > >> > >>> below (6 > > > > > > >> > >>> > seconds) and not getting 500 records also,it is > getting > > > only > > > > > 200 > > > > > > >> > >>> records > > > > > > >> > >>> > per poll and taking lot of time...why this is > happening > > > and > > > > > how > > > > > > to > > > > > > >> > >>> avoid > > > > > > >> > >>> > this. > > > > > > >> > >>> > > > > > > > >> > >>> > Poll Records Count :292 Time taken :1227 ms > > > > > > >> > >>> > Poll Records Count :292 Time taken :1181 ms > > > > > > >> > >>> > Poll Records Count :296 Time taken:1234 ms > > > > > > >> > >>> > Poll Records Count :292 Time taken:1133 ms > > > > > > >> > >>> > > > > > > > >> > >>> > *If I give an offset as 110999 and it is getting some > > fast > > > > and > > > > > > >> records > > > > > > >> > >>> > getting as 500 each..Why this difference please.* > > > > > > >> > >>> > > > > > > > >> > >>> > Poll Records Count :500 Time taken:1284 ms > > > > > > >> > >>> > Poll Records Count :500 Time taken:3 ms > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > > >> > >>> > Please give your suggestion on this. > > > > > > >> > >>> > > > > > > > >> > >>> > Regards, > > > > > > >> > >>> > Giridar > > > > > > >> > >>> > > > > > > >> > >>> > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >