Hi Yang,

*This is the old code which is perfectly doing fine and returning less than
3 seconds for all 1000 records.*

do {
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, fetchSize)
.build();

FetchResponse fetchResponse = consumer.fetch(req);


JSONObject obj = null;
ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
a_partition);

for (MessageAndOffset messageAndOffset : set) {
String message = null;
if (messageAndOffset.offset() < readOffset) {
log.warn("Found an old offset: {} ,Expecting: {}",
messageAndOffset.offset(), readOffset);
continue;
}

ByteBuffer payload = messageAndOffset.message().payload();

byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
try {
message = new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException ue) {
log.warn(ue.getMessage(), ue);
message = new String(bytes);
}

obj = new JSONObject(message);
msglist.add(new JSONObject(message));
}
readOffset = messageAndOffset.nextOffset();

if (msglist.size() >= Math.round(limit
/ inputReq.getApplicationArea().getReqInfo().size())
|| (endTime - startTime) >= waitTime) {
log.info(
"Wait condition has been met... exiting the fetch loop. recordCount - {},
time exhausted - {} ms.",
msglist.size(), (endTime - startTime));
end = true;
}

}  while (!end);


*The new code is taking 8 seconds.. which is more than double the time*

TopicPartition topicPartition = new TopicPartition("Test", 0);
consumer.seekToBeginning(Collections.singletonList(topicPartition));
long kafkaEarliestOffset = consumer.position(topicPartition);
try (KafkaConsumer < String, String > consumer =
KafkaConsumerFactory.createConsumer(clientName, fetchSize)) {
    consumer.partitionsFor(topicName);
    consumer.assign(Collections.singletonList(topicPartition));
    consumer.seek(topicPartition, readOffset);
    do {
        ConsumerRecords < String, String > records =
            consumer.poll(Duration.ofMillis(1500));
    } while (!end)

public static KafkaConsumer<String,String> createConsumer(String
clientName,int fetchSize) {
            Properties props = new Properties();
            String kafkaBrokerStr =
Config.getConsumerPropValue("kafkabrokerslist");
            String groupId = Config.getConsumerPropValue("group.id");
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
            props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false");
            props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
            props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
            props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
            return new KafkaConsumer<String,String>(props);
   }

How to improve the performance on this.

On Sun, Nov 24, 2024 at 4:11 PM giri mungi <girimung...@gmail.com> wrote:

> Hi Yang,
>
> Can i get the records from kafka as bytes or compression form so that i
> will take less time from kafka.
>
> I can build messages from those bytes.Is that possible?
>
> Can you please give suggestions on this.
>
> Thanks,
> Giridar
>
> On Sun, Nov 24, 2024 at 3:50 PM giri mungi <girimung...@gmail.com> wrote:
>
>> Hi Yang,
>>
>> Thanks for your reply.
>>
>> Now what should I do to improve my performance?Because the old kafka code
>> was good in performance
>>
>> These are the properties:
>>
>> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
>> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>> "false");
>>  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>  "earliest");
>>  props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
>>
>> How to decrease this time ,plz suggest .
>>
>> Thanks,
>> Giridar
>>
>> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <yangp...@gmail.com> wrote:
>>
>>> Hi Giridar,
>>>
>>> > *Code explanation:Fetching records is taking time for the first poll.*
>>> > Poll Records Count: 500 diff: 1284
>>> > Poll Records Count: 500 diff: 3
>>> >
>>> > For the first 500 records it took 1284 ms and next 500 records it took
>>> 4 ms
>>> >
>>> > *Why this much difference? I would like to improve the performance of
>>> the
>>> > first poll time?*
>>>
>>> IIRC, a consumer has FetchBuffer (cache).
>>> If a FetchResponse can return records morn than `max.poll.records`, the
>>> extra records will be stored in FetchBuffer.
>>> That is why the second poll can get data in short time.
>>> IMO, we don’t need to balance each poll time, because that means we
>>> don’t want local cache.
>>>
>>> > For example if i give "0" as input offset and it is taking time as
>>> below (6
>>> > seconds) and not getting 500 records also,it is getting only 200
>>> records
>>> > per poll and taking lot of time...why this is happening and how to
>>> avoid
>>> > this.
>>> >
>>> > Poll Records Count :292 Time taken :1227 ms
>>> > Poll Records Count :292 Time taken :1181 ms
>>> > Poll Records Count :296 Time taken:1234 ms
>>> > Poll Records Count :292 Time taken:1133 ms
>>> >
>>> > *If I give an offset as 110999 and it is getting some fast and records
>>> > getting as 500 each..Why this difference please.*
>>> >
>>> > Poll Records Count :500 Time taken:1284 ms
>>> > Poll Records Count :500 Time taken:3 ms
>>>
>>> IIUC, a FetchRequest has a limitation from `fetch.max.bytes`.
>>> If the record size from offset “0” is bigger than from offset “110999”,
>>> then a FetchResponse returns less records.
>>>
>>> Please correct me if I misunderstood anything.
>>>
>>> Thanks,
>>> PoAn
>>>
>>> > On Nov 24, 2024, at 2:09 PM, giri mungi <girimung...@gmail.com> wrote:
>>> >
>>> > Hi Team,
>>> >
>>> > Good day to you.
>>> >
>>> > Iam Giridhar.I need your suggestions in kafka
>>> > performance improvement please.
>>> >
>>> > *Scenario is: The user will give the offset as input and based on the
>>> > offset we need to give the next 1000 messages from kafka topic and next
>>> > offset.The kafka topic contains only one partition.*
>>> >
>>> > We are trying to migrate from old kafka to new kafka.In the old kafka
>>> we
>>> > were using code like:
>>> >
>>> > *old code(kafka clients 0.8 .1):*
>>> >
>>> > FetchRequest req = new
>>> > FetchRequestBuilder().clientId(clientName).addFetch(a_topic,
>>> >        a_partition, readOffset, fetchSize).build();
>>> > FetchResponse fetchResponse = consumer.fetch(req);
>>> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
>>> a_partition);
>>> >
>>> > This code is super fast and same we are trying to achieve using
>>> > KafkaConsumer API and getting slowness
>>> >
>>> > *New kafkaconsumer code is using(kafka clients 3.6 .1)*
>>> >
>>> > TopicPartition topicPartition = new TopicPartition("Test", 0);
>>> > consumer.seekToBeginning(Collections.singletonList(topicPartition));
>>> > long kafkaEarliestOffset = consumer.position(topicPartition);
>>> > try (KafkaConsumer < String, String > consumer =
>>> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) {
>>> >    consumer.assign(Collections.singletonList(topicPartition));
>>> >    consumer.seek(topicPartition, readOffset);
>>> >    do {
>>> >        ConsumerRecords < String, String > records =
>>> >            consumer.poll(Duration.ofMillis(1500));
>>> >    } while (!end)
>>> >
>>> > public static KafkaConsumer<String,String> createConsumer(String
>>> > clientName,int fetchSize) {
>>> >            Properties props = new Properties();
>>> >            String kafkaBrokerStr =
>>> > Config.getConsumerPropValue("kafkabrokerlist");
>>> >
>>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>>> > StringDeserializer.class.getName());
>>> >
>>> > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>>> > StringDeserializer.class.getName());
>>> >            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
>>> >            props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>>> > "false");
>>> >            props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>> > "earliest");
>>> >            props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
>>> >            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
>>> >            props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
>>> >            return new KafkaConsumer<String,String>(props);
>>> >   }
>>> >
>>> > *Code explanation:Fetching records is taking time for the first poll.*
>>> > Poll Records Count: 500 diff: 1284
>>> > Poll Records Count: 500 diff: 3
>>> >
>>> > For the first 500 records it took 1284 ms and next 500 records it took
>>> 4 ms
>>> >
>>> > *Why this much difference? I would like to improve the performance of
>>> the
>>> > first poll time?*
>>> >
>>> >
>>> > 1) How to fetch first 500 records in less time
>>> >
>>> > *I am also seeing one strange issue.My kafka topic which has one
>>> partition
>>> > contains some 5 lakh records*.*The starting records take more time to
>>> fetch
>>> > from kafka.*
>>> >
>>> > For example if i give "0" as input offset and it is taking time as
>>> below (6
>>> > seconds) and not getting 500 records also,it is getting only 200
>>> records
>>> > per poll and taking lot of time...why this is happening and how to
>>> avoid
>>> > this.
>>> >
>>> > Poll Records Count :292 Time taken :1227 ms
>>> > Poll Records Count :292 Time taken :1181 ms
>>> > Poll Records Count :296 Time taken:1234 ms
>>> > Poll Records Count :292 Time taken:1133 ms
>>> >
>>> > *If I give an offset as 110999 and it is getting some fast and records
>>> > getting as 500 each..Why this difference please.*
>>> >
>>> > Poll Records Count :500 Time taken:1284 ms
>>> > Poll Records Count :500 Time taken:3 ms
>>> >
>>> >
>>> >
>>> > Please give your suggestion on this.
>>> >
>>> > Regards,
>>> > Giridar
>>>
>>>

Reply via email to