Hi Ping,

1) Yes the records are already existent.
2)  Yeah the Kafka broker also upgraded to 3.6 version.
That’s why we are upgrading the client code also.
3) Do I need to compare any settings between old broker and new broker?

Please help.

Thanks
Giri

On Mon, 25 Nov 2024 at 6:45 AM, Chia-Ping Tsai <chia7...@gmail.com> wrote:

> hi
>
> 1) The records you want to read are already existent, right?
>
> 2) Are all your code changes solely focused on upgrading the client code
> from version 0.8 to 3.6? Or does the broker get upgraded as well?
>
>
> Thanks,
> Chia-Ping
>
> > giri mungi <girimung...@gmail.com> 於 2024年11月25日 凌晨1:27 寫道:
> >
> > Hi Ping,
> >
> > My bad,commitSync is not required.we can ignore that.
> >
> > Iam calculating diff as below :
> >
> > long stime = Calendar.getInstance().getTimeInMillis();
> > ConsumerRecords <String,String> records =
> > consumer.poll(Duration.ofMillis(2000));
> > long etime = Calendar.getInstance().getTimeInMillis();
> > log.info("Poll Records Count :{} Diff
> :{}",records.count(),(etime-stime));
> >
> > Thanks,
> > Giri
> >
> >> On Sun, Nov 24, 2024 at 10:48 PM Chia-Ping Tsai <chia7...@gmail.com>
> wrote:
> >>
> >> hi Giri
> >>
> >> 1. Why do you call `commitSync`? it seems your application does not use
> the
> >> consumer group as your use case is to random read, right?
> >> 2. how do you calculate "diff"?
> >>
> >> giri mungi <girimung...@gmail.com> 於 2024年11月25日 週一 上午12:50寫道:
> >>
> >>> Hi Ping,
> >>>
> >>> Please find the details below:
> >>>
> >>> 1) Kafka broker version is 3.6.1
> >>>
> >>> 2) Logic Explanation:
> >>>
> >>> Polls messages from Kafka using consumer.poll(Duration.ofMillis(2000)).
> >>>
> >>> *Exit Conditions:* The loop exits when Message Limit > 1000 is reached:
> >>> Then the end flag is true and the loop will exit.
> >>>
> >>>
> >>> boolean end = false;
> >>> consumer.seek(topicPartition, readOffset);
> >>> do {
> >>> JSONObject obj = null;
> >>> long stime = Calendar.getInstance().getTimeInMillis();
> >>> ConsumerRecords < String, String > records =
> >>> consumer.poll(Duration.ofMillis(2000));
> >>> for (ConsumerRecord< String, String > consumerRecord: records) {
> >>> String message = consumerRecord.value();
> >>> obj = new JSONObject(message);
> >>> msglist.add(obj);
> >>> }
> >>> }
> >>> if (msglist.size() >=1000) {
> >>>   end = true;
> >>> consumer.commitSync();
> >>> }
> >>> } while (!end);
> >>>
> >>>
> >>> Thanks,
> >>> Giri
> >>>
> >>> On Sun, Nov 24, 2024 at 9:23 PM Chia-Ping Tsai <chia7...@gmail.com>
> >> wrote:
> >>>
> >>>> hi
> >>>>
> >>>> 1) could you share the broker version to us?
> >>>> 2) could you explain how the sample code works? what is the "end"?
> >>>>
> >>>> ```
> >>>>    do {
> >>>>        ConsumerRecords < String, String > records =
> >>>>            consumer.poll(Duration.ofMillis(1500));
> >>>>    } while (!end)
> >>>> ```
> >>>>
> >>>> thanks,
> >>>> Chia-Ping
> >>>>
> >>>>
> >>>> giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午11:15寫道:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> All of them are from same consumer in each poll.
> >>>>>
> >>>>> Before poll we set offset to user input offset and try to consume
> >> next
> >>>> 1000
> >>>>> messages.
> >>>>>
> >>>>> Thanks
> >>>>> Giridhar.
> >>>>>
> >>>>> On Sun, 24 Nov 2024 at 8:40 PM, Chia-Ping Tsai <chia7...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> hi
> >>>>>>
> >>>>>>> Poll Records Count :0    diff :2004
> >>>>>> Poll Records Count :500  diff :943
> >>>>>>
> >>>>>> Are them from the same consumer in each poll? Or they are based on
> >>>>>> different "offset" and separate consumer's poll?
> >>>>>>
> >>>>>> thanks,
> >>>>>> Chia-Ping
> >>>>>>
> >>>>>>
> >>>>>> giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午8:51寫道:
> >>>>>>
> >>>>>>> Do i need to check any settings in the kafka server level?
> >>>>>>>
> >>>>>>> On Sun, Nov 24, 2024 at 6:19 PM giri mungi <
> >> girimung...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi I have set the below properties as below:
> >>>>>>>>
> >>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000");
> >>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
> >>>>>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500");
> >>>>>>>> props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> >>>>> "1750000");
> >>>>>>>>
> >>>>>>>> Poll Records Count :0    diff :2004
> >>>>>>>> Poll Records Count :500  diff :943
> >>>>>>>> Poll Records Count :0    diff :2000
> >>>>>>>> Poll Records Count :44   diff :3
> >>>>>>>> Poll Records Count :500  diff :205
> >>>>>>>> Poll Records Count :488  diff :1978
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> if i set max records as 500 or 1000 then sometimes i am getting
> >>>> count
> >>>>>> as
> >>>>>>>> "0"  and taking max wait ms time which is impacting the overall
> >>>>>> response
> >>>>>>>> time..
> >>>>>>>>
> >>>>>>>> Even though I have mentioned *fetch_min_bytes &
> >> max_poll_records
> >>>> iam
> >>>>>>>> getting  "0" records  which is impacting the overall response
> >>>> time.*
> >>>>>>>>
> >>>>>>>> How to avoid this please.
> >>>>>>>>
> >>>>>>>> please suggest this.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Giridar
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai <
> >>>> chia7...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> hi Giridar
> >>>>>>>>>
> >>>>>>>>> It seems your use case involves random reads, and you expect
> >> the
> >>>>>>> consumer
> >>>>>>>>> to return 1000 records from server at once. Therefore, you
> >> could
> >>>>>>> increase
> >>>>>>>>> the wait time (fetch.max.wait.ms) and fetch size
> >>>> (fetch.min.bytes)
> >>>>> to
> >>>>>>>>> receive a larger response with as many records as possible.
> >>>>>>>>>
> >>>>>>>>> A suitable value for fetch.min.bytes might be 'average size of
> >>>>>> records *
> >>>>>>>>> 1000'. You may also need to increase
> >> max.partition.fetch.bytes.
> >>>> For
> >>>>>>>>> fetch.max.wait.ms, a value of 1500 ms might match your
> >>>>> expectations.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Chia-Ping
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 2024/11/24 10:55:23 giri mungi wrote:
> >>>>>>>>>> Hi Yang,
> >>>>>>>>>>
> >>>>>>>>>> *This is the old code which is perfectly doing fine and
> >>>> returning
> >>>>>> less
> >>>>>>>>> than
> >>>>>>>>>> 3 seconds for all 1000 records.*
> >>>>>>>>>>
> >>>>>>>>>> do {
> >>>>>>>>>> FetchRequest req = new
> >>>> FetchRequestBuilder().clientId(clientName)
> >>>>>>>>>> .addFetch(a_topic, a_partition, readOffset, fetchSize)
> >>>>>>>>>> .build();
> >>>>>>>>>>
> >>>>>>>>>> FetchResponse fetchResponse = consumer.fetch(req);
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> JSONObject obj = null;
> >>>>>>>>>> ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
> >>>>>>>>>> a_partition);
> >>>>>>>>>>
> >>>>>>>>>> for (MessageAndOffset messageAndOffset : set) {
> >>>>>>>>>> String message = null;
> >>>>>>>>>> if (messageAndOffset.offset() < readOffset) {
> >>>>>>>>>> log.warn("Found an old offset: {} ,Expecting: {}",
> >>>>>>>>>> messageAndOffset.offset(), readOffset);
> >>>>>>>>>> continue;
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> ByteBuffer payload = messageAndOffset.message().payload();
> >>>>>>>>>>
> >>>>>>>>>> byte[] bytes = new byte[payload.limit()];
> >>>>>>>>>> payload.get(bytes);
> >>>>>>>>>> try {
> >>>>>>>>>> message = new String(bytes, "UTF-8");
> >>>>>>>>>> } catch (UnsupportedEncodingException ue) {
> >>>>>>>>>> log.warn(ue.getMessage(), ue);
> >>>>>>>>>> message = new String(bytes);
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> obj = new JSONObject(message);
> >>>>>>>>>> msglist.add(new JSONObject(message));
> >>>>>>>>>> }
> >>>>>>>>>> readOffset = messageAndOffset.nextOffset();
> >>>>>>>>>>
> >>>>>>>>>> if (msglist.size() >= Math.round(limit
> >>>>>>>>>> / inputReq.getApplicationArea().getReqInfo().size())
> >>>>>>>>>> || (endTime - startTime) >= waitTime) {
> >>>>>>>>>> log.info(
> >>>>>>>>>> "Wait condition has been met... exiting the fetch loop.
> >>>>> recordCount
> >>>>>> -
> >>>>>>>>> {},
> >>>>>>>>>> time exhausted - {} ms.",
> >>>>>>>>>> msglist.size(), (endTime - startTime));
> >>>>>>>>>> end = true;
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> }  while (!end);
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> *The new code is taking 8 seconds.. which is more than
> >> double
> >>>> the
> >>>>>>> time*
> >>>>>>>>>>
> >>>>>>>>>> TopicPartition topicPartition = new TopicPartition("Test",
> >> 0);
> >>>>>>>>>>
> >>>>> consumer.seekToBeginning(Collections.singletonList(topicPartition));
> >>>>>>>>>> long kafkaEarliestOffset =
> >> consumer.position(topicPartition);
> >>>>>>>>>> try (KafkaConsumer < String, String > consumer =
> >>>>>>>>>> KafkaConsumerFactory.createConsumer(clientName, fetchSize))
> >> {
> >>>>>>>>>>    consumer.partitionsFor(topicName);
> >>>>>>>>>>
> >>> consumer.assign(Collections.singletonList(topicPartition));
> >>>>>>>>>>    consumer.seek(topicPartition, readOffset);
> >>>>>>>>>>    do {
> >>>>>>>>>>        ConsumerRecords < String, String > records =
> >>>>>>>>>>            consumer.poll(Duration.ofMillis(1500));
> >>>>>>>>>>    } while (!end)
> >>>>>>>>>>
> >>>>>>>>>> public static KafkaConsumer<String,String>
> >>> createConsumer(String
> >>>>>>>>>> clientName,int fetchSize) {
> >>>>>>>>>>            Properties props = new Properties();
> >>>>>>>>>>            String kafkaBrokerStr =
> >>>>>>>>>> Config.getConsumerPropValue("kafkabrokerslist");
> >>>>>>>>>>            String groupId = Config.getConsumerPropValue("
> >>>>> group.id
> >>>>>> ");
> >>>>>>>>>>
> >>>>>>>>>
> >> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> >>>>>>>>>> StringDeserializer.class.getName());
> >>>>>>>>>>
> >>>>>>>>>>
> >>>> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> >>>>>>>>>> StringDeserializer.class.getName());
> >>>>>>>>>>
> >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> >>>> "");
> >>>>>>>>>>
> >>>>>>> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> >>>>>>>>>> "false");
> >>>>>>>>>>
> >>>>>> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> >>>>>>>>>> "earliest");
> >>>>>>>>>>            props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
> >>>>>> "1024");
> >>>>>>>>>>
> >>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
> >>>>>>>>>>
> >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
> >>>>>> "500");
> >>>>>>>>>>            return new KafkaConsumer<String,String>(props);
> >>>>>>>>>>   }
> >>>>>>>>>>
> >>>>>>>>>> How to improve the performance on this.
> >>>>>>>>>>
> >>>>>>>>>> On Sun, Nov 24, 2024 at 4:11 PM giri mungi <
> >>>> girimung...@gmail.com
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Yang,
> >>>>>>>>>>>
> >>>>>>>>>>> Can i get the records from kafka as bytes or compression
> >>> form
> >>>> so
> >>>>>>> that
> >>>>>>>>> i
> >>>>>>>>>>> will take less time from kafka.
> >>>>>>>>>>>
> >>>>>>>>>>> I can build messages from those bytes.Is that possible?
> >>>>>>>>>>>
> >>>>>>>>>>> Can you please give suggestions on this.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Giridar
> >>>>>>>>>>>
> >>>>>>>>>>> On Sun, Nov 24, 2024 at 3:50 PM giri mungi <
> >>>>> girimung...@gmail.com
> >>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Yang,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for your reply.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Now what should I do to improve my performance?Because
> >> the
> >>>> old
> >>>>>>> kafka
> >>>>>>>>> code
> >>>>>>>>>>>> was good in performance
> >>>>>>>>>>>>
> >>>>>>>>>>>> These are the properties:
> >>>>>>>>>>>>
> >>>>>>>>>>>> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
> >>>>>>>>>>>>
> >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> >>>>>>>>>>>> "false");
> >>>>>>>>>>>>
> >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> >>>>>>>>>>>> "earliest");
> >>>>>>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
> >> "1024");
> >>>>>>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
> >>>>>>>>>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
> >> "500");
> >>>>>>>>>>>>
> >>>>>>>>>>>> How to decrease this time ,plz suggest .
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Giridar
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <
> >>>> yangp...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Giridar,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> *Code explanation:Fetching records is taking time for
> >>> the
> >>>>>> first
> >>>>>>>>> poll.*
> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 1284
> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 3
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For the first 500 records it took 1284 ms and next 500
> >>>>> records
> >>>>>>> it
> >>>>>>>>> took
> >>>>>>>>>>>>> 4 ms
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *Why this much difference? I would like to improve the
> >>>>>>>>> performance of
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> first poll time?*
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> IIRC, a consumer has FetchBuffer (cache).
> >>>>>>>>>>>>> If a FetchResponse can return records morn than
> >>>>>>> `max.poll.records`,
> >>>>>>>>> the
> >>>>>>>>>>>>> extra records will be stored in FetchBuffer.
> >>>>>>>>>>>>> That is why the second poll can get data in short time.
> >>>>>>>>>>>>> IMO, we don’t need to balance each poll time, because
> >> that
> >>>>> means
> >>>>>>> we
> >>>>>>>>>>>>> don’t want local cache.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> For example if i give "0" as input offset and it is
> >>> taking
> >>>>>> time
> >>>>>>> as
> >>>>>>>>>>>>> below (6
> >>>>>>>>>>>>>> seconds) and not getting 500 records also,it is
> >> getting
> >>>> only
> >>>>>> 200
> >>>>>>>>>>>>> records
> >>>>>>>>>>>>>> per poll and taking lot of time...why this is
> >> happening
> >>>> and
> >>>>>> how
> >>>>>>> to
> >>>>>>>>>>>>> avoid
> >>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1227 ms
> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1181 ms
> >>>>>>>>>>>>>> Poll Records Count :296 Time taken:1234 ms
> >>>>>>>>>>>>>> Poll Records Count :292 Time taken:1133 ms
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *If I give an offset as 110999 and it is getting some
> >>> fast
> >>>>> and
> >>>>>>>>> records
> >>>>>>>>>>>>>> getting as 500 each..Why this difference please.*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:1284 ms
> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:3 ms
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> IIUC, a FetchRequest has a limitation from
> >>>> `fetch.max.bytes`.
> >>>>>>>>>>>>> If the record size from offset “0” is bigger than from
> >>>> offset
> >>>>>>>>> “110999”,
> >>>>>>>>>>>>> then a FetchResponse returns less records.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Please correct me if I misunderstood anything.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> PoAn
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Nov 24, 2024, at 2:09 PM, giri mungi <
> >>>>>> girimung...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Team,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Good day to you.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Iam Giridhar.I need your suggestions in kafka
> >>>>>>>>>>>>>> performance improvement please.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *Scenario is: The user will give the offset as input
> >> and
> >>>>> based
> >>>>>>> on
> >>>>>>>>> the
> >>>>>>>>>>>>>> offset we need to give the next 1000 messages from
> >> kafka
> >>>>> topic
> >>>>>>>>> and next
> >>>>>>>>>>>>>> offset.The kafka topic contains only one partition.*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We are trying to migrate from old kafka to new
> >> kafka.In
> >>>> the
> >>>>>> old
> >>>>>>>>> kafka
> >>>>>>>>>>>>> we
> >>>>>>>>>>>>>> were using code like:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *old code(kafka clients 0.8 .1):*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> FetchRequest req = new
> >>>>>>>>>>>>>>
> >>>> FetchRequestBuilder().clientId(clientName).addFetch(a_topic,
> >>>>>>>>>>>>>>       a_partition, readOffset, fetchSize).build();
> >>>>>>>>>>>>>> FetchResponse fetchResponse = consumer.fetch(req);
> >>>>>>>>>>>>>> ByteBufferMessageSet set =
> >>>> fetchResponse.messageSet(a_topic,
> >>>>>>>>>>>>> a_partition);
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This code is super fast and same we are trying to
> >>> achieve
> >>>>>> using
> >>>>>>>>>>>>>> KafkaConsumer API and getting slowness
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *New kafkaconsumer code is using(kafka clients 3.6
> >> .1)*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> TopicPartition topicPartition = new
> >>> TopicPartition("Test",
> >>>>> 0);
> >>>>>>>>>>>>>>
> >>>>>>>>>
> >>>> consumer.seekToBeginning(Collections.singletonList(topicPartition));
> >>>>>>>>>>>>>> long kafkaEarliestOffset =
> >>>>> consumer.position(topicPartition);
> >>>>>>>>>>>>>> try (KafkaConsumer < String, String > consumer =
> >>>>>>>>>>>>>> KafkaConsumerFactory.createConsumer(clientName,
> >>>> fetchSize))
> >>>>> {
> >>>>>>>>>>>>>>
> >>>>> consumer.assign(Collections.singletonList(topicPartition));
> >>>>>>>>>>>>>>   consumer.seek(topicPartition, readOffset);
> >>>>>>>>>>>>>>   do {
> >>>>>>>>>>>>>>       ConsumerRecords < String, String > records =
> >>>>>>>>>>>>>>           consumer.poll(Duration.ofMillis(1500));
> >>>>>>>>>>>>>>   } while (!end)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> public static KafkaConsumer<String,String>
> >>>>>> createConsumer(String
> >>>>>>>>>>>>>> clientName,int fetchSize) {
> >>>>>>>>>>>>>>           Properties props = new Properties();
> >>>>>>>>>>>>>>           String kafkaBrokerStr =
> >>>>>>>>>>>>>> Config.getConsumerPropValue("kafkabrokerlist");
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> >>>>>>>>>>>>>> StringDeserializer.class.getName());
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> >>>>>>>>>>>>>> StringDeserializer.class.getName());
> >>>>>>>>>>>>>>
> >>>> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> >>>>>>> "");
> >>>>>>>>>>>>>>
> >>>>>>>>> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> >>>>>>>>>>>>>> "false");
> >>>>>>>>>>>>>>
> >>>>>>>>> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> >>>>>>>>>>>>>> "earliest");
> >>>>>>>>>>>>>>
> >>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
> >>>>>>>>> "1024");
> >>>>>>>>>>>>>>
> >>>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
> >>>>>>>>>>>>>>
> >>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
> >>>>>>>>> "500");
> >>>>>>>>>>>>>>           return new
> >>> KafkaConsumer<String,String>(props);
> >>>>>>>>>>>>>>  }
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *Code explanation:Fetching records is taking time for
> >>> the
> >>>>>> first
> >>>>>>>>> poll.*
> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 1284
> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 3
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For the first 500 records it took 1284 ms and next 500
> >>>>> records
> >>>>>>> it
> >>>>>>>>> took
> >>>>>>>>>>>>> 4 ms
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *Why this much difference? I would like to improve the
> >>>>>>>>> performance of
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> first poll time?*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1) How to fetch first 500 records in less time
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *I am also seeing one strange issue.My kafka topic
> >> which
> >>>> has
> >>>>>> one
> >>>>>>>>>>>>> partition
> >>>>>>>>>>>>>> contains some 5 lakh records*.*The starting records
> >> take
> >>>>> more
> >>>>>>>>> time to
> >>>>>>>>>>>>> fetch
> >>>>>>>>>>>>>> from kafka.*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For example if i give "0" as input offset and it is
> >>> taking
> >>>>>> time
> >>>>>>> as
> >>>>>>>>>>>>> below (6
> >>>>>>>>>>>>>> seconds) and not getting 500 records also,it is
> >> getting
> >>>> only
> >>>>>> 200
> >>>>>>>>>>>>> records
> >>>>>>>>>>>>>> per poll and taking lot of time...why this is
> >> happening
> >>>> and
> >>>>>> how
> >>>>>>> to
> >>>>>>>>>>>>> avoid
> >>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1227 ms
> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1181 ms
> >>>>>>>>>>>>>> Poll Records Count :296 Time taken:1234 ms
> >>>>>>>>>>>>>> Poll Records Count :292 Time taken:1133 ms
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *If I give an offset as 110999 and it is getting some
> >>> fast
> >>>>> and
> >>>>>>>>> records
> >>>>>>>>>>>>>> getting as 500 each..Why this difference please.*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:1284 ms
> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:3 ms
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Please give your suggestion on this.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>> Giridar
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Reply via email to