Hi Team,

Good day to you.

Iam Giridhar.I need your suggestions in kafka
performance improvement please.

*Scenario is: The user will give the offset as input and based on the
offset we need to give the next 1000 messages from kafka topic and next
offset.The kafka topic contains only one partition.*

We are trying to migrate from old kafka to new kafka.In the old kafka we
were using code like:

*old code(kafka clients 0.8 .1):*

FetchRequest req = new
FetchRequestBuilder().clientId(clientName).addFetch(a_topic,
        a_partition, readOffset, fetchSize).build();
FetchResponse fetchResponse = consumer.fetch(req);
ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, a_partition);

This code is super fast and same we are trying to achieve using
KafkaConsumer API and getting slowness

*New kafkaconsumer code is using(kafka clients 3.6 .1)*

TopicPartition topicPartition = new TopicPartition("Test", 0);
consumer.seekToBeginning(Collections.singletonList(topicPartition));
long kafkaEarliestOffset = consumer.position(topicPartition);
try (KafkaConsumer < String, String > consumer =
KafkaConsumerFactory.createConsumer(clientName, fetchSize)) {
    consumer.assign(Collections.singletonList(topicPartition));
    consumer.seek(topicPartition, readOffset);
    do {
        ConsumerRecords < String, String > records =
            consumer.poll(Duration.ofMillis(1500));
    } while (!end)

public static KafkaConsumer<String,String> createConsumer(String
clientName,int fetchSize) {
            Properties props = new Properties();
            String kafkaBrokerStr =
Config.getConsumerPropValue("kafkabrokerlist");
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
            props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false");
            props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
            props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
            props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
            return new KafkaConsumer<String,String>(props);
   }

*Code explanation:Fetching records is taking time for the first poll.*
Poll Records Count: 500 diff: 1284
Poll Records Count: 500 diff: 3

For the first 500 records it took 1284 ms and next 500 records it took 4 ms

*Why this much difference? I would like to improve the performance of the
first poll time?*


1) How to fetch first 500 records in less time

 *I am also seeing one strange issue.My kafka topic which has one partition
contains some 5 lakh records*.*The starting records take more time to fetch
from kafka.*

For example if i give "0" as input offset and it is taking time as below (6
seconds) and not getting 500 records also,it is getting only 200 records
per poll and taking lot of time...why this is happening and how to avoid
this.

Poll Records Count :292 Time taken :1227 ms
Poll Records Count :292 Time taken :1181 ms
Poll Records Count :296 Time taken:1234 ms
Poll Records Count :292 Time taken:1133 ms

*If I give an offset as 110999 and it is getting some fast and records
getting as 500 each..Why this difference please.*

Poll Records Count :500 Time taken:1284 ms
Poll Records Count :500 Time taken:3 ms



Please give your suggestion on this.

Regards,
Giridar

Reply via email to