Hi all,

Please give suggestion on this.please help ,Iam stuck with this.

Thanks
Giri
On Mon, 25 Nov 2024 at 9:22 AM, giri mungi <girimung...@gmail.com> wrote:

> Hi Ping,
>
> 1) Yes the records are already existent.
> 2)  Yeah the Kafka broker also upgraded to 3.6 version.
> That’s why we are upgrading the client code also.
> 3) Do I need to compare any settings between old broker and new broker?
>
> Please help.
>
> Thanks
> Giri
>
> On Mon, 25 Nov 2024 at 6:45 AM, Chia-Ping Tsai <chia7...@gmail.com> wrote:
>
>> hi
>>
>> 1) The records you want to read are already existent, right?
>>
>> 2) Are all your code changes solely focused on upgrading the client code
>> from version 0.8 to 3.6? Or does the broker get upgraded as well?
>>
>>
>> Thanks,
>> Chia-Ping
>>
>> > giri mungi <girimung...@gmail.com> 於 2024年11月25日 凌晨1:27 寫道:
>> >
>> > Hi Ping,
>> >
>> > My bad,commitSync is not required.we can ignore that.
>> >
>> > Iam calculating diff as below :
>> >
>> > long stime = Calendar.getInstance().getTimeInMillis();
>> > ConsumerRecords <String,String> records =
>> > consumer.poll(Duration.ofMillis(2000));
>> > long etime = Calendar.getInstance().getTimeInMillis();
>> > log.info("Poll Records Count :{} Diff
>> :{}",records.count(),(etime-stime));
>> >
>> > Thanks,
>> > Giri
>> >
>> >> On Sun, Nov 24, 2024 at 10:48 PM Chia-Ping Tsai <chia7...@gmail.com>
>> wrote:
>> >>
>> >> hi Giri
>> >>
>> >> 1. Why do you call `commitSync`? it seems your application does not
>> use the
>> >> consumer group as your use case is to random read, right?
>> >> 2. how do you calculate "diff"?
>> >>
>> >> giri mungi <girimung...@gmail.com> 於 2024年11月25日 週一 上午12:50寫道:
>> >>
>> >>> Hi Ping,
>> >>>
>> >>> Please find the details below:
>> >>>
>> >>> 1) Kafka broker version is 3.6.1
>> >>>
>> >>> 2) Logic Explanation:
>> >>>
>> >>> Polls messages from Kafka using
>> consumer.poll(Duration.ofMillis(2000)).
>> >>>
>> >>> *Exit Conditions:* The loop exits when Message Limit > 1000 is
>> reached:
>> >>> Then the end flag is true and the loop will exit.
>> >>>
>> >>>
>> >>> boolean end = false;
>> >>> consumer.seek(topicPartition, readOffset);
>> >>> do {
>> >>> JSONObject obj = null;
>> >>> long stime = Calendar.getInstance().getTimeInMillis();
>> >>> ConsumerRecords < String, String > records =
>> >>> consumer.poll(Duration.ofMillis(2000));
>> >>> for (ConsumerRecord< String, String > consumerRecord: records) {
>> >>> String message = consumerRecord.value();
>> >>> obj = new JSONObject(message);
>> >>> msglist.add(obj);
>> >>> }
>> >>> }
>> >>> if (msglist.size() >=1000) {
>> >>>   end = true;
>> >>> consumer.commitSync();
>> >>> }
>> >>> } while (!end);
>> >>>
>> >>>
>> >>> Thanks,
>> >>> Giri
>> >>>
>> >>> On Sun, Nov 24, 2024 at 9:23 PM Chia-Ping Tsai <chia7...@gmail.com>
>> >> wrote:
>> >>>
>> >>>> hi
>> >>>>
>> >>>> 1) could you share the broker version to us?
>> >>>> 2) could you explain how the sample code works? what is the "end"?
>> >>>>
>> >>>> ```
>> >>>>    do {
>> >>>>        ConsumerRecords < String, String > records =
>> >>>>            consumer.poll(Duration.ofMillis(1500));
>> >>>>    } while (!end)
>> >>>> ```
>> >>>>
>> >>>> thanks,
>> >>>> Chia-Ping
>> >>>>
>> >>>>
>> >>>> giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午11:15寫道:
>> >>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> All of them are from same consumer in each poll.
>> >>>>>
>> >>>>> Before poll we set offset to user input offset and try to consume
>> >> next
>> >>>> 1000
>> >>>>> messages.
>> >>>>>
>> >>>>> Thanks
>> >>>>> Giridhar.
>> >>>>>
>> >>>>> On Sun, 24 Nov 2024 at 8:40 PM, Chia-Ping Tsai <chia7...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>>> hi
>> >>>>>>
>> >>>>>>> Poll Records Count :0    diff :2004
>> >>>>>> Poll Records Count :500  diff :943
>> >>>>>>
>> >>>>>> Are them from the same consumer in each poll? Or they are based on
>> >>>>>> different "offset" and separate consumer's poll?
>> >>>>>>
>> >>>>>> thanks,
>> >>>>>> Chia-Ping
>> >>>>>>
>> >>>>>>
>> >>>>>> giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午8:51寫道:
>> >>>>>>
>> >>>>>>> Do i need to check any settings in the kafka server level?
>> >>>>>>>
>> >>>>>>> On Sun, Nov 24, 2024 at 6:19 PM giri mungi <
>> >> girimung...@gmail.com>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi I have set the below properties as below:
>> >>>>>>>>
>> >>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000");
>> >>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
>> >>>>>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500");
>> >>>>>>>> props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
>> >>>>> "1750000");
>> >>>>>>>>
>> >>>>>>>> Poll Records Count :0    diff :2004
>> >>>>>>>> Poll Records Count :500  diff :943
>> >>>>>>>> Poll Records Count :0    diff :2000
>> >>>>>>>> Poll Records Count :44   diff :3
>> >>>>>>>> Poll Records Count :500  diff :205
>> >>>>>>>> Poll Records Count :488  diff :1978
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> if i set max records as 500 or 1000 then sometimes i am getting
>> >>>> count
>> >>>>>> as
>> >>>>>>>> "0"  and taking max wait ms time which is impacting the overall
>> >>>>>> response
>> >>>>>>>> time..
>> >>>>>>>>
>> >>>>>>>> Even though I have mentioned *fetch_min_bytes &
>> >> max_poll_records
>> >>>> iam
>> >>>>>>>> getting  "0" records  which is impacting the overall response
>> >>>> time.*
>> >>>>>>>>
>> >>>>>>>> How to avoid this please.
>> >>>>>>>>
>> >>>>>>>> please suggest this.
>> >>>>>>>>
>> >>>>>>>> Thanks,
>> >>>>>>>> Giridar
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai <
>> >>>> chia7...@apache.org>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> hi Giridar
>> >>>>>>>>>
>> >>>>>>>>> It seems your use case involves random reads, and you expect
>> >> the
>> >>>>>>> consumer
>> >>>>>>>>> to return 1000 records from server at once. Therefore, you
>> >> could
>> >>>>>>> increase
>> >>>>>>>>> the wait time (fetch.max.wait.ms) and fetch size
>> >>>> (fetch.min.bytes)
>> >>>>> to
>> >>>>>>>>> receive a larger response with as many records as possible.
>> >>>>>>>>>
>> >>>>>>>>> A suitable value for fetch.min.bytes might be 'average size of
>> >>>>>> records *
>> >>>>>>>>> 1000'. You may also need to increase
>> >> max.partition.fetch.bytes.
>> >>>> For
>> >>>>>>>>> fetch.max.wait.ms, a value of 1500 ms might match your
>> >>>>> expectations.
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>> Chia-Ping
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On 2024/11/24 10:55:23 giri mungi wrote:
>> >>>>>>>>>> Hi Yang,
>> >>>>>>>>>>
>> >>>>>>>>>> *This is the old code which is perfectly doing fine and
>> >>>> returning
>> >>>>>> less
>> >>>>>>>>> than
>> >>>>>>>>>> 3 seconds for all 1000 records.*
>> >>>>>>>>>>
>> >>>>>>>>>> do {
>> >>>>>>>>>> FetchRequest req = new
>> >>>> FetchRequestBuilder().clientId(clientName)
>> >>>>>>>>>> .addFetch(a_topic, a_partition, readOffset, fetchSize)
>> >>>>>>>>>> .build();
>> >>>>>>>>>>
>> >>>>>>>>>> FetchResponse fetchResponse = consumer.fetch(req);
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> JSONObject obj = null;
>> >>>>>>>>>> ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
>> >>>>>>>>>> a_partition);
>> >>>>>>>>>>
>> >>>>>>>>>> for (MessageAndOffset messageAndOffset : set) {
>> >>>>>>>>>> String message = null;
>> >>>>>>>>>> if (messageAndOffset.offset() < readOffset) {
>> >>>>>>>>>> log.warn("Found an old offset: {} ,Expecting: {}",
>> >>>>>>>>>> messageAndOffset.offset(), readOffset);
>> >>>>>>>>>> continue;
>> >>>>>>>>>> }
>> >>>>>>>>>>
>> >>>>>>>>>> ByteBuffer payload = messageAndOffset.message().payload();
>> >>>>>>>>>>
>> >>>>>>>>>> byte[] bytes = new byte[payload.limit()];
>> >>>>>>>>>> payload.get(bytes);
>> >>>>>>>>>> try {
>> >>>>>>>>>> message = new String(bytes, "UTF-8");
>> >>>>>>>>>> } catch (UnsupportedEncodingException ue) {
>> >>>>>>>>>> log.warn(ue.getMessage(), ue);
>> >>>>>>>>>> message = new String(bytes);
>> >>>>>>>>>> }
>> >>>>>>>>>>
>> >>>>>>>>>> obj = new JSONObject(message);
>> >>>>>>>>>> msglist.add(new JSONObject(message));
>> >>>>>>>>>> }
>> >>>>>>>>>> readOffset = messageAndOffset.nextOffset();
>> >>>>>>>>>>
>> >>>>>>>>>> if (msglist.size() >= Math.round(limit
>> >>>>>>>>>> / inputReq.getApplicationArea().getReqInfo().size())
>> >>>>>>>>>> || (endTime - startTime) >= waitTime) {
>> >>>>>>>>>> log.info(
>> >>>>>>>>>> "Wait condition has been met... exiting the fetch loop.
>> >>>>> recordCount
>> >>>>>> -
>> >>>>>>>>> {},
>> >>>>>>>>>> time exhausted - {} ms.",
>> >>>>>>>>>> msglist.size(), (endTime - startTime));
>> >>>>>>>>>> end = true;
>> >>>>>>>>>> }
>> >>>>>>>>>>
>> >>>>>>>>>> }  while (!end);
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> *The new code is taking 8 seconds.. which is more than
>> >> double
>> >>>> the
>> >>>>>>> time*
>> >>>>>>>>>>
>> >>>>>>>>>> TopicPartition topicPartition = new TopicPartition("Test",
>> >> 0);
>> >>>>>>>>>>
>> >>>>> consumer.seekToBeginning(Collections.singletonList(topicPartition));
>> >>>>>>>>>> long kafkaEarliestOffset =
>> >> consumer.position(topicPartition);
>> >>>>>>>>>> try (KafkaConsumer < String, String > consumer =
>> >>>>>>>>>> KafkaConsumerFactory.createConsumer(clientName, fetchSize))
>> >> {
>> >>>>>>>>>>    consumer.partitionsFor(topicName);
>> >>>>>>>>>>
>> >>> consumer.assign(Collections.singletonList(topicPartition));
>> >>>>>>>>>>    consumer.seek(topicPartition, readOffset);
>> >>>>>>>>>>    do {
>> >>>>>>>>>>        ConsumerRecords < String, String > records =
>> >>>>>>>>>>            consumer.poll(Duration.ofMillis(1500));
>> >>>>>>>>>>    } while (!end)
>> >>>>>>>>>>
>> >>>>>>>>>> public static KafkaConsumer<String,String>
>> >>> createConsumer(String
>> >>>>>>>>>> clientName,int fetchSize) {
>> >>>>>>>>>>            Properties props = new Properties();
>> >>>>>>>>>>            String kafkaBrokerStr =
>> >>>>>>>>>> Config.getConsumerPropValue("kafkabrokerslist");
>> >>>>>>>>>>            String groupId = Config.getConsumerPropValue("
>> >>>>> group.id
>> >>>>>> ");
>> >>>>>>>>>>
>> >>>>>>>>>
>> >> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>> >>>>>>>>>> StringDeserializer.class.getName());
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>> >>>>>>>>>> StringDeserializer.class.getName());
>> >>>>>>>>>>
>> >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
>> >>>> "");
>> >>>>>>>>>>
>> >>>>>>> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>> >>>>>>>>>> "false");
>> >>>>>>>>>>
>> >>>>>> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>> >>>>>>>>>> "earliest");
>> >>>>>>>>>>            props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
>> >>>>>> "1024");
>> >>>>>>>>>>
>> >>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
>> >>>>>>>>>>
>> >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
>> >>>>>> "500");
>> >>>>>>>>>>            return new KafkaConsumer<String,String>(props);
>> >>>>>>>>>>   }
>> >>>>>>>>>>
>> >>>>>>>>>> How to improve the performance on this.
>> >>>>>>>>>>
>> >>>>>>>>>> On Sun, Nov 24, 2024 at 4:11 PM giri mungi <
>> >>>> girimung...@gmail.com
>> >>>>>>
>> >>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi Yang,
>> >>>>>>>>>>>
>> >>>>>>>>>>> Can i get the records from kafka as bytes or compression
>> >>> form
>> >>>> so
>> >>>>>>> that
>> >>>>>>>>> i
>> >>>>>>>>>>> will take less time from kafka.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I can build messages from those bytes.Is that possible?
>> >>>>>>>>>>>
>> >>>>>>>>>>> Can you please give suggestions on this.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks,
>> >>>>>>>>>>> Giridar
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Sun, Nov 24, 2024 at 3:50 PM giri mungi <
>> >>>>> girimung...@gmail.com
>> >>>>>>>
>> >>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hi Yang,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks for your reply.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Now what should I do to improve my performance?Because
>> >> the
>> >>>> old
>> >>>>>>> kafka
>> >>>>>>>>> code
>> >>>>>>>>>>>> was good in performance
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> These are the properties:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
>> >>>>>>>>>>>>
>> >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>> >>>>>>>>>>>> "false");
>> >>>>>>>>>>>>
>> >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>> >>>>>>>>>>>> "earliest");
>> >>>>>>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
>> >> "1024");
>> >>>>>>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
>> >>>>>>>>>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
>> >> "500");
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> How to decrease this time ,plz suggest .
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>> Giridar
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <
>> >>>> yangp...@gmail.com>
>> >>>>>>>>> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>> Hi Giridar,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *Code explanation:Fetching records is taking time for
>> >>> the
>> >>>>>> first
>> >>>>>>>>> poll.*
>> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 1284
>> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 3
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> For the first 500 records it took 1284 ms and next 500
>> >>>>> records
>> >>>>>>> it
>> >>>>>>>>> took
>> >>>>>>>>>>>>> 4 ms
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *Why this much difference? I would like to improve the
>> >>>>>>>>> performance of
>> >>>>>>>>>>>>> the
>> >>>>>>>>>>>>>> first poll time?*
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> IIRC, a consumer has FetchBuffer (cache).
>> >>>>>>>>>>>>> If a FetchResponse can return records morn than
>> >>>>>>> `max.poll.records`,
>> >>>>>>>>> the
>> >>>>>>>>>>>>> extra records will be stored in FetchBuffer.
>> >>>>>>>>>>>>> That is why the second poll can get data in short time.
>> >>>>>>>>>>>>> IMO, we don’t need to balance each poll time, because
>> >> that
>> >>>>> means
>> >>>>>>> we
>> >>>>>>>>>>>>> don’t want local cache.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>> For example if i give "0" as input offset and it is
>> >>> taking
>> >>>>>> time
>> >>>>>>> as
>> >>>>>>>>>>>>> below (6
>> >>>>>>>>>>>>>> seconds) and not getting 500 records also,it is
>> >> getting
>> >>>> only
>> >>>>>> 200
>> >>>>>>>>>>>>> records
>> >>>>>>>>>>>>>> per poll and taking lot of time...why this is
>> >> happening
>> >>>> and
>> >>>>>> how
>> >>>>>>> to
>> >>>>>>>>>>>>> avoid
>> >>>>>>>>>>>>>> this.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1227 ms
>> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1181 ms
>> >>>>>>>>>>>>>> Poll Records Count :296 Time taken:1234 ms
>> >>>>>>>>>>>>>> Poll Records Count :292 Time taken:1133 ms
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *If I give an offset as 110999 and it is getting some
>> >>> fast
>> >>>>> and
>> >>>>>>>>> records
>> >>>>>>>>>>>>>> getting as 500 each..Why this difference please.*
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:1284 ms
>> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:3 ms
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> IIUC, a FetchRequest has a limitation from
>> >>>> `fetch.max.bytes`.
>> >>>>>>>>>>>>> If the record size from offset “0” is bigger than from
>> >>>> offset
>> >>>>>>>>> “110999”,
>> >>>>>>>>>>>>> then a FetchResponse returns less records.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Please correct me if I misunderstood anything.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>>> PoAn
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On Nov 24, 2024, at 2:09 PM, giri mungi <
>> >>>>>> girimung...@gmail.com>
>> >>>>>>>>> wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Hi Team,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Good day to you.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Iam Giridhar.I need your suggestions in kafka
>> >>>>>>>>>>>>>> performance improvement please.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *Scenario is: The user will give the offset as input
>> >> and
>> >>>>> based
>> >>>>>>> on
>> >>>>>>>>> the
>> >>>>>>>>>>>>>> offset we need to give the next 1000 messages from
>> >> kafka
>> >>>>> topic
>> >>>>>>>>> and next
>> >>>>>>>>>>>>>> offset.The kafka topic contains only one partition.*
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> We are trying to migrate from old kafka to new
>> >> kafka.In
>> >>>> the
>> >>>>>> old
>> >>>>>>>>> kafka
>> >>>>>>>>>>>>> we
>> >>>>>>>>>>>>>> were using code like:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *old code(kafka clients 0.8 .1):*
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> FetchRequest req = new
>> >>>>>>>>>>>>>>
>> >>>> FetchRequestBuilder().clientId(clientName).addFetch(a_topic,
>> >>>>>>>>>>>>>>       a_partition, readOffset, fetchSize).build();
>> >>>>>>>>>>>>>> FetchResponse fetchResponse = consumer.fetch(req);
>> >>>>>>>>>>>>>> ByteBufferMessageSet set =
>> >>>> fetchResponse.messageSet(a_topic,
>> >>>>>>>>>>>>> a_partition);
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> This code is super fast and same we are trying to
>> >>> achieve
>> >>>>>> using
>> >>>>>>>>>>>>>> KafkaConsumer API and getting slowness
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *New kafkaconsumer code is using(kafka clients 3.6
>> >> .1)*
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> TopicPartition topicPartition = new
>> >>> TopicPartition("Test",
>> >>>>> 0);
>> >>>>>>>>>>>>>>
>> >>>>>>>>>
>> >>>> consumer.seekToBeginning(Collections.singletonList(topicPartition));
>> >>>>>>>>>>>>>> long kafkaEarliestOffset =
>> >>>>> consumer.position(topicPartition);
>> >>>>>>>>>>>>>> try (KafkaConsumer < String, String > consumer =
>> >>>>>>>>>>>>>> KafkaConsumerFactory.createConsumer(clientName,
>> >>>> fetchSize))
>> >>>>> {
>> >>>>>>>>>>>>>>
>> >>>>> consumer.assign(Collections.singletonList(topicPartition));
>> >>>>>>>>>>>>>>   consumer.seek(topicPartition, readOffset);
>> >>>>>>>>>>>>>>   do {
>> >>>>>>>>>>>>>>       ConsumerRecords < String, String > records =
>> >>>>>>>>>>>>>>           consumer.poll(Duration.ofMillis(1500));
>> >>>>>>>>>>>>>>   } while (!end)
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> public static KafkaConsumer<String,String>
>> >>>>>> createConsumer(String
>> >>>>>>>>>>>>>> clientName,int fetchSize) {
>> >>>>>>>>>>>>>>           Properties props = new Properties();
>> >>>>>>>>>>>>>>           String kafkaBrokerStr =
>> >>>>>>>>>>>>>> Config.getConsumerPropValue("kafkabrokerlist");
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>> >>>>>>>>>>>>>> StringDeserializer.class.getName());
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>> >>>>>>>>>>>>>> StringDeserializer.class.getName());
>> >>>>>>>>>>>>>>
>> >>>> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
>> >>>>>>> "");
>> >>>>>>>>>>>>>>
>> >>>>>>>>> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>> >>>>>>>>>>>>>> "false");
>> >>>>>>>>>>>>>>
>> >>>>>>>>> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>> >>>>>>>>>>>>>> "earliest");
>> >>>>>>>>>>>>>>
>> >>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
>> >>>>>>>>> "1024");
>> >>>>>>>>>>>>>>
>> >>>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
>> >>>>>>>>>>>>>>
>> >>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
>> >>>>>>>>> "500");
>> >>>>>>>>>>>>>>           return new
>> >>> KafkaConsumer<String,String>(props);
>> >>>>>>>>>>>>>>  }
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *Code explanation:Fetching records is taking time for
>> >>> the
>> >>>>>> first
>> >>>>>>>>> poll.*
>> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 1284
>> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 3
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> For the first 500 records it took 1284 ms and next 500
>> >>>>> records
>> >>>>>>> it
>> >>>>>>>>> took
>> >>>>>>>>>>>>> 4 ms
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *Why this much difference? I would like to improve the
>> >>>>>>>>> performance of
>> >>>>>>>>>>>>> the
>> >>>>>>>>>>>>>> first poll time?*
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> 1) How to fetch first 500 records in less time
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *I am also seeing one strange issue.My kafka topic
>> >> which
>> >>>> has
>> >>>>>> one
>> >>>>>>>>>>>>> partition
>> >>>>>>>>>>>>>> contains some 5 lakh records*.*The starting records
>> >> take
>> >>>>> more
>> >>>>>>>>> time to
>> >>>>>>>>>>>>> fetch
>> >>>>>>>>>>>>>> from kafka.*
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> For example if i give "0" as input offset and it is
>> >>> taking
>> >>>>>> time
>> >>>>>>> as
>> >>>>>>>>>>>>> below (6
>> >>>>>>>>>>>>>> seconds) and not getting 500 records also,it is
>> >> getting
>> >>>> only
>> >>>>>> 200
>> >>>>>>>>>>>>> records
>> >>>>>>>>>>>>>> per poll and taking lot of time...why this is
>> >> happening
>> >>>> and
>> >>>>>> how
>> >>>>>>> to
>> >>>>>>>>>>>>> avoid
>> >>>>>>>>>>>>>> this.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1227 ms
>> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1181 ms
>> >>>>>>>>>>>>>> Poll Records Count :296 Time taken:1234 ms
>> >>>>>>>>>>>>>> Poll Records Count :292 Time taken:1133 ms
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> *If I give an offset as 110999 and it is getting some
>> >>> fast
>> >>>>> and
>> >>>>>>>>> records
>> >>>>>>>>>>>>>> getting as 500 each..Why this difference please.*
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:1284 ms
>> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:3 ms
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Please give your suggestion on this.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>>> Giridar
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>>
>

Reply via email to