hi Giridar It seems your use case involves random reads, and you expect the consumer to return 1000 records from server at once. Therefore, you could increase the wait time (fetch.max.wait.ms) and fetch size (fetch.min.bytes) to receive a larger response with as many records as possible.
A suitable value for fetch.min.bytes might be 'average size of records * 1000'. You may also need to increase max.partition.fetch.bytes. For fetch.max.wait.ms, a value of 1500 ms might match your expectations. Best, Chia-Ping On 2024/11/24 10:55:23 giri mungi wrote: > Hi Yang, > > *This is the old code which is perfectly doing fine and returning less than > 3 seconds for all 1000 records.* > > do { > FetchRequest req = new FetchRequestBuilder().clientId(clientName) > .addFetch(a_topic, a_partition, readOffset, fetchSize) > .build(); > > FetchResponse fetchResponse = consumer.fetch(req); > > > JSONObject obj = null; > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, > a_partition); > > for (MessageAndOffset messageAndOffset : set) { > String message = null; > if (messageAndOffset.offset() < readOffset) { > log.warn("Found an old offset: {} ,Expecting: {}", > messageAndOffset.offset(), readOffset); > continue; > } > > ByteBuffer payload = messageAndOffset.message().payload(); > > byte[] bytes = new byte[payload.limit()]; > payload.get(bytes); > try { > message = new String(bytes, "UTF-8"); > } catch (UnsupportedEncodingException ue) { > log.warn(ue.getMessage(), ue); > message = new String(bytes); > } > > obj = new JSONObject(message); > msglist.add(new JSONObject(message)); > } > readOffset = messageAndOffset.nextOffset(); > > if (msglist.size() >= Math.round(limit > / inputReq.getApplicationArea().getReqInfo().size()) > || (endTime - startTime) >= waitTime) { > log.info( > "Wait condition has been met... exiting the fetch loop. recordCount - {}, > time exhausted - {} ms.", > msglist.size(), (endTime - startTime)); > end = true; > } > > } while (!end); > > > *The new code is taking 8 seconds.. which is more than double the time* > > TopicPartition topicPartition = new TopicPartition("Test", 0); > consumer.seekToBeginning(Collections.singletonList(topicPartition)); > long kafkaEarliestOffset = consumer.position(topicPartition); > try (KafkaConsumer < String, String > consumer = > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { > consumer.partitionsFor(topicName); > consumer.assign(Collections.singletonList(topicPartition)); > consumer.seek(topicPartition, readOffset); > do { > ConsumerRecords < String, String > records = > consumer.poll(Duration.ofMillis(1500)); > } while (!end) > > public static KafkaConsumer<String,String> createConsumer(String > clientName,int fetchSize) { > Properties props = new Properties(); > String kafkaBrokerStr = > Config.getConsumerPropValue("kafkabrokerslist"); > String groupId = Config.getConsumerPropValue("group.id"); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > "false"); > props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "earliest"); > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); > return new KafkaConsumer<String,String>(props); > } > > How to improve the performance on this. > > On Sun, Nov 24, 2024 at 4:11 PM giri mungi <girimung...@gmail.com> wrote: > > > Hi Yang, > > > > Can i get the records from kafka as bytes or compression form so that i > > will take less time from kafka. > > > > I can build messages from those bytes.Is that possible? > > > > Can you please give suggestions on this. > > > > Thanks, > > Giridar > > > > On Sun, Nov 24, 2024 at 3:50 PM giri mungi <girimung...@gmail.com> wrote: > > > >> Hi Yang, > >> > >> Thanks for your reply. > >> > >> Now what should I do to improve my performance?Because the old kafka code > >> was good in performance > >> > >> These are the properties: > >> > >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > >> "false"); > >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > >> "earliest"); > >> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); > >> > >> How to decrease this time ,plz suggest . > >> > >> Thanks, > >> Giridar > >> > >> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <yangp...@gmail.com> wrote: > >> > >>> Hi Giridar, > >>> > >>> > *Code explanation:Fetching records is taking time for the first poll.* > >>> > Poll Records Count: 500 diff: 1284 > >>> > Poll Records Count: 500 diff: 3 > >>> > > >>> > For the first 500 records it took 1284 ms and next 500 records it took > >>> 4 ms > >>> > > >>> > *Why this much difference? I would like to improve the performance of > >>> the > >>> > first poll time?* > >>> > >>> IIRC, a consumer has FetchBuffer (cache). > >>> If a FetchResponse can return records morn than `max.poll.records`, the > >>> extra records will be stored in FetchBuffer. > >>> That is why the second poll can get data in short time. > >>> IMO, we don’t need to balance each poll time, because that means we > >>> don’t want local cache. > >>> > >>> > For example if i give "0" as input offset and it is taking time as > >>> below (6 > >>> > seconds) and not getting 500 records also,it is getting only 200 > >>> records > >>> > per poll and taking lot of time...why this is happening and how to > >>> avoid > >>> > this. > >>> > > >>> > Poll Records Count :292 Time taken :1227 ms > >>> > Poll Records Count :292 Time taken :1181 ms > >>> > Poll Records Count :296 Time taken:1234 ms > >>> > Poll Records Count :292 Time taken:1133 ms > >>> > > >>> > *If I give an offset as 110999 and it is getting some fast and records > >>> > getting as 500 each..Why this difference please.* > >>> > > >>> > Poll Records Count :500 Time taken:1284 ms > >>> > Poll Records Count :500 Time taken:3 ms > >>> > >>> IIUC, a FetchRequest has a limitation from `fetch.max.bytes`. > >>> If the record size from offset “0” is bigger than from offset “110999”, > >>> then a FetchResponse returns less records. > >>> > >>> Please correct me if I misunderstood anything. > >>> > >>> Thanks, > >>> PoAn > >>> > >>> > On Nov 24, 2024, at 2:09 PM, giri mungi <girimung...@gmail.com> wrote: > >>> > > >>> > Hi Team, > >>> > > >>> > Good day to you. > >>> > > >>> > Iam Giridhar.I need your suggestions in kafka > >>> > performance improvement please. > >>> > > >>> > *Scenario is: The user will give the offset as input and based on the > >>> > offset we need to give the next 1000 messages from kafka topic and next > >>> > offset.The kafka topic contains only one partition.* > >>> > > >>> > We are trying to migrate from old kafka to new kafka.In the old kafka > >>> we > >>> > were using code like: > >>> > > >>> > *old code(kafka clients 0.8 .1):* > >>> > > >>> > FetchRequest req = new > >>> > FetchRequestBuilder().clientId(clientName).addFetch(a_topic, > >>> > a_partition, readOffset, fetchSize).build(); > >>> > FetchResponse fetchResponse = consumer.fetch(req); > >>> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, > >>> a_partition); > >>> > > >>> > This code is super fast and same we are trying to achieve using > >>> > KafkaConsumer API and getting slowness > >>> > > >>> > *New kafkaconsumer code is using(kafka clients 3.6 .1)* > >>> > > >>> > TopicPartition topicPartition = new TopicPartition("Test", 0); > >>> > consumer.seekToBeginning(Collections.singletonList(topicPartition)); > >>> > long kafkaEarliestOffset = consumer.position(topicPartition); > >>> > try (KafkaConsumer < String, String > consumer = > >>> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { > >>> > consumer.assign(Collections.singletonList(topicPartition)); > >>> > consumer.seek(topicPartition, readOffset); > >>> > do { > >>> > ConsumerRecords < String, String > records = > >>> > consumer.poll(Duration.ofMillis(1500)); > >>> > } while (!end) > >>> > > >>> > public static KafkaConsumer<String,String> createConsumer(String > >>> > clientName,int fetchSize) { > >>> > Properties props = new Properties(); > >>> > String kafkaBrokerStr = > >>> > Config.getConsumerPropValue("kafkabrokerlist"); > >>> > > >>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > >>> > StringDeserializer.class.getName()); > >>> > > >>> > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > >>> > StringDeserializer.class.getName()); > >>> > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > >>> > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > >>> > "false"); > >>> > props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > >>> > "earliest"); > >>> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); > >>> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > >>> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); > >>> > return new KafkaConsumer<String,String>(props); > >>> > } > >>> > > >>> > *Code explanation:Fetching records is taking time for the first poll.* > >>> > Poll Records Count: 500 diff: 1284 > >>> > Poll Records Count: 500 diff: 3 > >>> > > >>> > For the first 500 records it took 1284 ms and next 500 records it took > >>> 4 ms > >>> > > >>> > *Why this much difference? I would like to improve the performance of > >>> the > >>> > first poll time?* > >>> > > >>> > > >>> > 1) How to fetch first 500 records in less time > >>> > > >>> > *I am also seeing one strange issue.My kafka topic which has one > >>> partition > >>> > contains some 5 lakh records*.*The starting records take more time to > >>> fetch > >>> > from kafka.* > >>> > > >>> > For example if i give "0" as input offset and it is taking time as > >>> below (6 > >>> > seconds) and not getting 500 records also,it is getting only 200 > >>> records > >>> > per poll and taking lot of time...why this is happening and how to > >>> avoid > >>> > this. > >>> > > >>> > Poll Records Count :292 Time taken :1227 ms > >>> > Poll Records Count :292 Time taken :1181 ms > >>> > Poll Records Count :296 Time taken:1234 ms > >>> > Poll Records Count :292 Time taken:1133 ms > >>> > > >>> > *If I give an offset as 110999 and it is getting some fast and records > >>> > getting as 500 each..Why this difference please.* > >>> > > >>> > Poll Records Count :500 Time taken:1284 ms > >>> > Poll Records Count :500 Time taken:3 ms > >>> > > >>> > > >>> > > >>> > Please give your suggestion on this. > >>> > > >>> > Regards, > >>> > Giridar > >>> > >>> >