Hi Yang, *This is the old code which is perfectly doing fine and returning less than 3 seconds for all 1000 records.*
do { FetchRequest req = new FetchRequestBuilder().clientId(clientName) .addFetch(a_topic, a_partition, readOffset, fetchSize) .build(); FetchResponse fetchResponse = consumer.fetch(req); JSONObject obj = null; ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, a_partition); for (MessageAndOffset messageAndOffset : set) { String message = null; if (messageAndOffset.offset() < readOffset) { log.warn("Found an old offset: {} ,Expecting: {}", messageAndOffset.offset(), readOffset); continue; } ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); try { message = new String(bytes, "UTF-8"); } catch (UnsupportedEncodingException ue) { log.warn(ue.getMessage(), ue); message = new String(bytes); } obj = new JSONObject(message); msglist.add(new JSONObject(message)); } readOffset = messageAndOffset.nextOffset(); if (msglist.size() >= Math.round(limit / inputReq.getApplicationArea().getReqInfo().size()) || (endTime - startTime) >= waitTime) { log.info( "Wait condition has been met... exiting the fetch loop. recordCount - {}, time exhausted - {} ms.", msglist.size(), (endTime - startTime)); end = true; } } while (!end); *The new code is taking 8 seconds.. which is more than double the time* TopicPartition topicPartition = new TopicPartition("Test", 0); consumer.seekToBeginning(Collections.singletonList(topicPartition)); long kafkaEarliestOffset = consumer.position(topicPartition); try (KafkaConsumer < String, String > consumer = KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { consumer.partitionsFor(topicName); consumer.assign(Collections.singletonList(topicPartition)); consumer.seek(topicPartition, readOffset); do { ConsumerRecords < String, String > records = consumer.poll(Duration.ofMillis(1500)); } while (!end) public static KafkaConsumer<String,String> createConsumer(String clientName,int fetchSize) { Properties props = new Properties(); String kafkaBrokerStr = Config.getConsumerPropValue("kafkabrokerslist"); String groupId = Config.getConsumerPropValue("group.id"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); return new KafkaConsumer<String,String>(props); } How to improve the performance on this. On Sun, Nov 24, 2024 at 4:11 PM giri mungi <girimung...@gmail.com> wrote: > Hi Yang, > > Can i get the records from kafka as bytes or compression form so that i > will take less time from kafka. > > I can build messages from those bytes.Is that possible? > > Can you please give suggestions on this. > > Thanks, > Giridar > > On Sun, Nov 24, 2024 at 3:50 PM giri mungi <girimung...@gmail.com> wrote: > >> Hi Yang, >> >> Thanks for your reply. >> >> Now what should I do to improve my performance?Because the old kafka code >> was good in performance >> >> These are the properties: >> >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >> "false"); >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >> "earliest"); >> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); >> >> How to decrease this time ,plz suggest . >> >> Thanks, >> Giridar >> >> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <yangp...@gmail.com> wrote: >> >>> Hi Giridar, >>> >>> > *Code explanation:Fetching records is taking time for the first poll.* >>> > Poll Records Count: 500 diff: 1284 >>> > Poll Records Count: 500 diff: 3 >>> > >>> > For the first 500 records it took 1284 ms and next 500 records it took >>> 4 ms >>> > >>> > *Why this much difference? I would like to improve the performance of >>> the >>> > first poll time?* >>> >>> IIRC, a consumer has FetchBuffer (cache). >>> If a FetchResponse can return records morn than `max.poll.records`, the >>> extra records will be stored in FetchBuffer. >>> That is why the second poll can get data in short time. >>> IMO, we don’t need to balance each poll time, because that means we >>> don’t want local cache. >>> >>> > For example if i give "0" as input offset and it is taking time as >>> below (6 >>> > seconds) and not getting 500 records also,it is getting only 200 >>> records >>> > per poll and taking lot of time...why this is happening and how to >>> avoid >>> > this. >>> > >>> > Poll Records Count :292 Time taken :1227 ms >>> > Poll Records Count :292 Time taken :1181 ms >>> > Poll Records Count :296 Time taken:1234 ms >>> > Poll Records Count :292 Time taken:1133 ms >>> > >>> > *If I give an offset as 110999 and it is getting some fast and records >>> > getting as 500 each..Why this difference please.* >>> > >>> > Poll Records Count :500 Time taken:1284 ms >>> > Poll Records Count :500 Time taken:3 ms >>> >>> IIUC, a FetchRequest has a limitation from `fetch.max.bytes`. >>> If the record size from offset “0” is bigger than from offset “110999”, >>> then a FetchResponse returns less records. >>> >>> Please correct me if I misunderstood anything. >>> >>> Thanks, >>> PoAn >>> >>> > On Nov 24, 2024, at 2:09 PM, giri mungi <girimung...@gmail.com> wrote: >>> > >>> > Hi Team, >>> > >>> > Good day to you. >>> > >>> > Iam Giridhar.I need your suggestions in kafka >>> > performance improvement please. >>> > >>> > *Scenario is: The user will give the offset as input and based on the >>> > offset we need to give the next 1000 messages from kafka topic and next >>> > offset.The kafka topic contains only one partition.* >>> > >>> > We are trying to migrate from old kafka to new kafka.In the old kafka >>> we >>> > were using code like: >>> > >>> > *old code(kafka clients 0.8 .1):* >>> > >>> > FetchRequest req = new >>> > FetchRequestBuilder().clientId(clientName).addFetch(a_topic, >>> > a_partition, readOffset, fetchSize).build(); >>> > FetchResponse fetchResponse = consumer.fetch(req); >>> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, >>> a_partition); >>> > >>> > This code is super fast and same we are trying to achieve using >>> > KafkaConsumer API and getting slowness >>> > >>> > *New kafkaconsumer code is using(kafka clients 3.6 .1)* >>> > >>> > TopicPartition topicPartition = new TopicPartition("Test", 0); >>> > consumer.seekToBeginning(Collections.singletonList(topicPartition)); >>> > long kafkaEarliestOffset = consumer.position(topicPartition); >>> > try (KafkaConsumer < String, String > consumer = >>> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { >>> > consumer.assign(Collections.singletonList(topicPartition)); >>> > consumer.seek(topicPartition, readOffset); >>> > do { >>> > ConsumerRecords < String, String > records = >>> > consumer.poll(Duration.ofMillis(1500)); >>> > } while (!end) >>> > >>> > public static KafkaConsumer<String,String> createConsumer(String >>> > clientName,int fetchSize) { >>> > Properties props = new Properties(); >>> > String kafkaBrokerStr = >>> > Config.getConsumerPropValue("kafkabrokerlist"); >>> > >>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >>> > StringDeserializer.class.getName()); >>> > >>> > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>> > StringDeserializer.class.getName()); >>> > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); >>> > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >>> > "false"); >>> > props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>> > "earliest"); >>> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); >>> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); >>> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); >>> > return new KafkaConsumer<String,String>(props); >>> > } >>> > >>> > *Code explanation:Fetching records is taking time for the first poll.* >>> > Poll Records Count: 500 diff: 1284 >>> > Poll Records Count: 500 diff: 3 >>> > >>> > For the first 500 records it took 1284 ms and next 500 records it took >>> 4 ms >>> > >>> > *Why this much difference? I would like to improve the performance of >>> the >>> > first poll time?* >>> > >>> > >>> > 1) How to fetch first 500 records in less time >>> > >>> > *I am also seeing one strange issue.My kafka topic which has one >>> partition >>> > contains some 5 lakh records*.*The starting records take more time to >>> fetch >>> > from kafka.* >>> > >>> > For example if i give "0" as input offset and it is taking time as >>> below (6 >>> > seconds) and not getting 500 records also,it is getting only 200 >>> records >>> > per poll and taking lot of time...why this is happening and how to >>> avoid >>> > this. >>> > >>> > Poll Records Count :292 Time taken :1227 ms >>> > Poll Records Count :292 Time taken :1181 ms >>> > Poll Records Count :296 Time taken:1234 ms >>> > Poll Records Count :292 Time taken:1133 ms >>> > >>> > *If I give an offset as 110999 and it is getting some fast and records >>> > getting as 500 each..Why this difference please.* >>> > >>> > Poll Records Count :500 Time taken:1284 ms >>> > Poll Records Count :500 Time taken:3 ms >>> > >>> > >>> > >>> > Please give your suggestion on this. >>> > >>> > Regards, >>> > Giridar >>> >>>