hi Giri

1. Why do you call `commitSync`? it seems your application does not use the
consumer group as your use case is to random read, right?
2. how do you calculate "diff"?

giri mungi <girimung...@gmail.com> 於 2024年11月25日 週一 上午12:50寫道:

> Hi Ping,
>
> Please find the details below:
>
> 1) Kafka broker version is 3.6.1
>
> 2) Logic Explanation:
>
> Polls messages from Kafka using consumer.poll(Duration.ofMillis(2000)).
>
> *Exit Conditions:* The loop exits when Message Limit > 1000 is reached:
> Then the end flag is true and the loop will exit.
>
>
> boolean end = false;
> consumer.seek(topicPartition, readOffset);
> do {
> JSONObject obj = null;
> long stime = Calendar.getInstance().getTimeInMillis();
> ConsumerRecords < String, String > records =
> consumer.poll(Duration.ofMillis(2000));
>  for (ConsumerRecord< String, String > consumerRecord: records) {
>  String message = consumerRecord.value();
>  obj = new JSONObject(message);
>  msglist.add(obj);
> }
>  }
> if (msglist.size() >=1000) {
>    end = true;
> consumer.commitSync();
> }
> } while (!end);
>
>
> Thanks,
> Giri
>
> On Sun, Nov 24, 2024 at 9:23 PM Chia-Ping Tsai <chia7...@gmail.com> wrote:
>
> > hi
> >
> > 1) could you share the broker version to us?
> > 2) could you explain how the sample code works? what is the "end"?
> >
> > ```
> >     do {
> >         ConsumerRecords < String, String > records =
> >             consumer.poll(Duration.ofMillis(1500));
> >     } while (!end)
> > ```
> >
> > thanks,
> > Chia-Ping
> >
> >
> > giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午11:15寫道:
> >
> > > Hi,
> > >
> > > All of them are from same consumer in each poll.
> > >
> > > Before poll we set offset to user input offset and try to consume next
> > 1000
> > > messages.
> > >
> > > Thanks
> > > Giridhar.
> > >
> > > On Sun, 24 Nov 2024 at 8:40 PM, Chia-Ping Tsai <chia7...@gmail.com>
> > wrote:
> > >
> > > > hi
> > > >
> > > > > Poll Records Count :0    diff :2004
> > > > Poll Records Count :500  diff :943
> > > >
> > > > Are them from the same consumer in each poll? Or they are based on
> > > > different "offset" and separate consumer's poll?
> > > >
> > > > thanks,
> > > > Chia-Ping
> > > >
> > > >
> > > > giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午8:51寫道:
> > > >
> > > > > Do i need to check any settings in the kafka server level?
> > > > >
> > > > > On Sun, Nov 24, 2024 at 6:19 PM giri mungi <girimung...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi I have set the below properties as below:
> > > > > >
> > > > > > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000");
> > > > > > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
> > > > > > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500");
> > > > > > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> > > "1750000");
> > > > > >
> > > > > > Poll Records Count :0    diff :2004
> > > > > > Poll Records Count :500  diff :943
> > > > > > Poll Records Count :0    diff :2000
> > > > > > Poll Records Count :44   diff :3
> > > > > > Poll Records Count :500  diff :205
> > > > > > Poll Records Count :488  diff :1978
> > > > > >
> > > > > >
> > > > > > if i set max records as 500 or 1000 then sometimes i am getting
> > count
> > > > as
> > > > > > "0"  and taking max wait ms time which is impacting the overall
> > > > response
> > > > > > time..
> > > > > >
> > > > > > Even though I have mentioned *fetch_min_bytes & max_poll_records
> > iam
> > > > > > getting  "0" records  which is impacting the overall response
> > time.*
> > > > > >
> > > > > > How to avoid this please.
> > > > > >
> > > > > > please suggest this.
> > > > > >
> > > > > > Thanks,
> > > > > > Giridar
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai <
> > chia7...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > >> hi Giridar
> > > > > >>
> > > > > >> It seems your use case involves random reads, and you expect the
> > > > > consumer
> > > > > >> to return 1000 records from server at once. Therefore, you could
> > > > > increase
> > > > > >> the wait time (fetch.max.wait.ms) and fetch size
> > (fetch.min.bytes)
> > > to
> > > > > >> receive a larger response with as many records as possible.
> > > > > >>
> > > > > >> A suitable value for fetch.min.bytes might be 'average size of
> > > > records *
> > > > > >> 1000'. You may also need to increase max.partition.fetch.bytes.
> > For
> > > > > >> fetch.max.wait.ms, a value of 1500 ms might match your
> > > expectations.
> > > > > >>
> > > > > >> Best,
> > > > > >> Chia-Ping
> > > > > >>
> > > > > >>
> > > > > >> On 2024/11/24 10:55:23 giri mungi wrote:
> > > > > >> > Hi Yang,
> > > > > >> >
> > > > > >> > *This is the old code which is perfectly doing fine and
> > returning
> > > > less
> > > > > >> than
> > > > > >> > 3 seconds for all 1000 records.*
> > > > > >> >
> > > > > >> > do {
> > > > > >> > FetchRequest req = new
> > FetchRequestBuilder().clientId(clientName)
> > > > > >> > .addFetch(a_topic, a_partition, readOffset, fetchSize)
> > > > > >> > .build();
> > > > > >> >
> > > > > >> > FetchResponse fetchResponse = consumer.fetch(req);
> > > > > >> >
> > > > > >> >
> > > > > >> > JSONObject obj = null;
> > > > > >> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
> > > > > >> > a_partition);
> > > > > >> >
> > > > > >> > for (MessageAndOffset messageAndOffset : set) {
> > > > > >> > String message = null;
> > > > > >> > if (messageAndOffset.offset() < readOffset) {
> > > > > >> > log.warn("Found an old offset: {} ,Expecting: {}",
> > > > > >> > messageAndOffset.offset(), readOffset);
> > > > > >> > continue;
> > > > > >> > }
> > > > > >> >
> > > > > >> > ByteBuffer payload = messageAndOffset.message().payload();
> > > > > >> >
> > > > > >> > byte[] bytes = new byte[payload.limit()];
> > > > > >> > payload.get(bytes);
> > > > > >> > try {
> > > > > >> > message = new String(bytes, "UTF-8");
> > > > > >> > } catch (UnsupportedEncodingException ue) {
> > > > > >> > log.warn(ue.getMessage(), ue);
> > > > > >> > message = new String(bytes);
> > > > > >> > }
> > > > > >> >
> > > > > >> > obj = new JSONObject(message);
> > > > > >> > msglist.add(new JSONObject(message));
> > > > > >> > }
> > > > > >> > readOffset = messageAndOffset.nextOffset();
> > > > > >> >
> > > > > >> > if (msglist.size() >= Math.round(limit
> > > > > >> > / inputReq.getApplicationArea().getReqInfo().size())
> > > > > >> > || (endTime - startTime) >= waitTime) {
> > > > > >> > log.info(
> > > > > >> > "Wait condition has been met... exiting the fetch loop.
> > > recordCount
> > > > -
> > > > > >> {},
> > > > > >> > time exhausted - {} ms.",
> > > > > >> > msglist.size(), (endTime - startTime));
> > > > > >> > end = true;
> > > > > >> > }
> > > > > >> >
> > > > > >> > }  while (!end);
> > > > > >> >
> > > > > >> >
> > > > > >> > *The new code is taking 8 seconds.. which is more than double
> > the
> > > > > time*
> > > > > >> >
> > > > > >> > TopicPartition topicPartition = new TopicPartition("Test", 0);
> > > > > >> >
> > > consumer.seekToBeginning(Collections.singletonList(topicPartition));
> > > > > >> > long kafkaEarliestOffset = consumer.position(topicPartition);
> > > > > >> > try (KafkaConsumer < String, String > consumer =
> > > > > >> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) {
> > > > > >> >     consumer.partitionsFor(topicName);
> > > > > >> >
>  consumer.assign(Collections.singletonList(topicPartition));
> > > > > >> >     consumer.seek(topicPartition, readOffset);
> > > > > >> >     do {
> > > > > >> >         ConsumerRecords < String, String > records =
> > > > > >> >             consumer.poll(Duration.ofMillis(1500));
> > > > > >> >     } while (!end)
> > > > > >> >
> > > > > >> > public static KafkaConsumer<String,String>
> createConsumer(String
> > > > > >> > clientName,int fetchSize) {
> > > > > >> >             Properties props = new Properties();
> > > > > >> >             String kafkaBrokerStr =
> > > > > >> > Config.getConsumerPropValue("kafkabrokerslist");
> > > > > >> >             String groupId = Config.getConsumerPropValue("
> > > group.id
> > > > ");
> > > > > >> >
> > > > > >>  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > > > > >> > StringDeserializer.class.getName());
> > > > > >> >
> > > > > >> >
> > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > > > > >> > StringDeserializer.class.getName());
> > > > > >> >             props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> > "");
> > > > > >> >
> > > > >  props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > > > > >> > "false");
> > > > > >> >
> > > >  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > > >> > "earliest");
> > > > > >> >             props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
> > > > "1024");
> > > > > >> >
> > >  props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
> > > > > >> >             props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
> > > > "500");
> > > > > >> >             return new KafkaConsumer<String,String>(props);
> > > > > >> >    }
> > > > > >> >
> > > > > >> > How to improve the performance on this.
> > > > > >> >
> > > > > >> > On Sun, Nov 24, 2024 at 4:11 PM giri mungi <
> > girimung...@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > Hi Yang,
> > > > > >> > >
> > > > > >> > > Can i get the records from kafka as bytes or compression
> form
> > so
> > > > > that
> > > > > >> i
> > > > > >> > > will take less time from kafka.
> > > > > >> > >
> > > > > >> > > I can build messages from those bytes.Is that possible?
> > > > > >> > >
> > > > > >> > > Can you please give suggestions on this.
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > > Giridar
> > > > > >> > >
> > > > > >> > > On Sun, Nov 24, 2024 at 3:50 PM giri mungi <
> > > girimung...@gmail.com
> > > > >
> > > > > >> wrote:
> > > > > >> > >
> > > > > >> > >> Hi Yang,
> > > > > >> > >>
> > > > > >> > >> Thanks for your reply.
> > > > > >> > >>
> > > > > >> > >> Now what should I do to improve my performance?Because the
> > old
> > > > > kafka
> > > > > >> code
> > > > > >> > >> was good in performance
> > > > > >> > >>
> > > > > >> > >> These are the properties:
> > > > > >> > >>
> > > > > >> > >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
> > > > > >> > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > > > > >> > >> "false");
> > > > > >> > >>  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > > >> > >>  "earliest");
> > > > > >> > >>  props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
> > > > > >> > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
> > > > > >> > >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
> > > > > >> > >>
> > > > > >> > >> How to decrease this time ,plz suggest .
> > > > > >> > >>
> > > > > >> > >> Thanks,
> > > > > >> > >> Giridar
> > > > > >> > >>
> > > > > >> > >> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <
> > yangp...@gmail.com>
> > > > > >> wrote:
> > > > > >> > >>
> > > > > >> > >>> Hi Giridar,
> > > > > >> > >>>
> > > > > >> > >>> > *Code explanation:Fetching records is taking time for
> the
> > > > first
> > > > > >> poll.*
> > > > > >> > >>> > Poll Records Count: 500 diff: 1284
> > > > > >> > >>> > Poll Records Count: 500 diff: 3
> > > > > >> > >>> >
> > > > > >> > >>> > For the first 500 records it took 1284 ms and next 500
> > > records
> > > > > it
> > > > > >> took
> > > > > >> > >>> 4 ms
> > > > > >> > >>> >
> > > > > >> > >>> > *Why this much difference? I would like to improve the
> > > > > >> performance of
> > > > > >> > >>> the
> > > > > >> > >>> > first poll time?*
> > > > > >> > >>>
> > > > > >> > >>> IIRC, a consumer has FetchBuffer (cache).
> > > > > >> > >>> If a FetchResponse can return records morn than
> > > > > `max.poll.records`,
> > > > > >> the
> > > > > >> > >>> extra records will be stored in FetchBuffer.
> > > > > >> > >>> That is why the second poll can get data in short time.
> > > > > >> > >>> IMO, we don’t need to balance each poll time, because that
> > > means
> > > > > we
> > > > > >> > >>> don’t want local cache.
> > > > > >> > >>>
> > > > > >> > >>> > For example if i give "0" as input offset and it is
> taking
> > > > time
> > > > > as
> > > > > >> > >>> below (6
> > > > > >> > >>> > seconds) and not getting 500 records also,it is getting
> > only
> > > > 200
> > > > > >> > >>> records
> > > > > >> > >>> > per poll and taking lot of time...why this is happening
> > and
> > > > how
> > > > > to
> > > > > >> > >>> avoid
> > > > > >> > >>> > this.
> > > > > >> > >>> >
> > > > > >> > >>> > Poll Records Count :292 Time taken :1227 ms
> > > > > >> > >>> > Poll Records Count :292 Time taken :1181 ms
> > > > > >> > >>> > Poll Records Count :296 Time taken:1234 ms
> > > > > >> > >>> > Poll Records Count :292 Time taken:1133 ms
> > > > > >> > >>> >
> > > > > >> > >>> > *If I give an offset as 110999 and it is getting some
> fast
> > > and
> > > > > >> records
> > > > > >> > >>> > getting as 500 each..Why this difference please.*
> > > > > >> > >>> >
> > > > > >> > >>> > Poll Records Count :500 Time taken:1284 ms
> > > > > >> > >>> > Poll Records Count :500 Time taken:3 ms
> > > > > >> > >>>
> > > > > >> > >>> IIUC, a FetchRequest has a limitation from
> > `fetch.max.bytes`.
> > > > > >> > >>> If the record size from offset “0” is bigger than from
> > offset
> > > > > >> “110999”,
> > > > > >> > >>> then a FetchResponse returns less records.
> > > > > >> > >>>
> > > > > >> > >>> Please correct me if I misunderstood anything.
> > > > > >> > >>>
> > > > > >> > >>> Thanks,
> > > > > >> > >>> PoAn
> > > > > >> > >>>
> > > > > >> > >>> > On Nov 24, 2024, at 2:09 PM, giri mungi <
> > > > girimung...@gmail.com>
> > > > > >> wrote:
> > > > > >> > >>> >
> > > > > >> > >>> > Hi Team,
> > > > > >> > >>> >
> > > > > >> > >>> > Good day to you.
> > > > > >> > >>> >
> > > > > >> > >>> > Iam Giridhar.I need your suggestions in kafka
> > > > > >> > >>> > performance improvement please.
> > > > > >> > >>> >
> > > > > >> > >>> > *Scenario is: The user will give the offset as input and
> > > based
> > > > > on
> > > > > >> the
> > > > > >> > >>> > offset we need to give the next 1000 messages from kafka
> > > topic
> > > > > >> and next
> > > > > >> > >>> > offset.The kafka topic contains only one partition.*
> > > > > >> > >>> >
> > > > > >> > >>> > We are trying to migrate from old kafka to new kafka.In
> > the
> > > > old
> > > > > >> kafka
> > > > > >> > >>> we
> > > > > >> > >>> > were using code like:
> > > > > >> > >>> >
> > > > > >> > >>> > *old code(kafka clients 0.8 .1):*
> > > > > >> > >>> >
> > > > > >> > >>> > FetchRequest req = new
> > > > > >> > >>> >
> > FetchRequestBuilder().clientId(clientName).addFetch(a_topic,
> > > > > >> > >>> >        a_partition, readOffset, fetchSize).build();
> > > > > >> > >>> > FetchResponse fetchResponse = consumer.fetch(req);
> > > > > >> > >>> > ByteBufferMessageSet set =
> > fetchResponse.messageSet(a_topic,
> > > > > >> > >>> a_partition);
> > > > > >> > >>> >
> > > > > >> > >>> > This code is super fast and same we are trying to
> achieve
> > > > using
> > > > > >> > >>> > KafkaConsumer API and getting slowness
> > > > > >> > >>> >
> > > > > >> > >>> > *New kafkaconsumer code is using(kafka clients 3.6 .1)*
> > > > > >> > >>> >
> > > > > >> > >>> > TopicPartition topicPartition = new
> TopicPartition("Test",
> > > 0);
> > > > > >> > >>> >
> > > > > >>
> > consumer.seekToBeginning(Collections.singletonList(topicPartition));
> > > > > >> > >>> > long kafkaEarliestOffset =
> > > consumer.position(topicPartition);
> > > > > >> > >>> > try (KafkaConsumer < String, String > consumer =
> > > > > >> > >>> > KafkaConsumerFactory.createConsumer(clientName,
> > fetchSize))
> > > {
> > > > > >> > >>> >
> > > consumer.assign(Collections.singletonList(topicPartition));
> > > > > >> > >>> >    consumer.seek(topicPartition, readOffset);
> > > > > >> > >>> >    do {
> > > > > >> > >>> >        ConsumerRecords < String, String > records =
> > > > > >> > >>> >            consumer.poll(Duration.ofMillis(1500));
> > > > > >> > >>> >    } while (!end)
> > > > > >> > >>> >
> > > > > >> > >>> > public static KafkaConsumer<String,String>
> > > > createConsumer(String
> > > > > >> > >>> > clientName,int fetchSize) {
> > > > > >> > >>> >            Properties props = new Properties();
> > > > > >> > >>> >            String kafkaBrokerStr =
> > > > > >> > >>> > Config.getConsumerPropValue("kafkabrokerlist");
> > > > > >> > >>> >
> > > > > >> > >>>
> > > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > > > > >> > >>> > StringDeserializer.class.getName());
> > > > > >> > >>> >
> > > > > >> > >>> >
> > > > > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > > > > >> > >>> > StringDeserializer.class.getName());
> > > > > >> > >>> >
> > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> > > > > "");
> > > > > >> > >>> >
> > > > > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > > > > >> > >>> > "false");
> > > > > >> > >>> >
> > > > > >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > > >> > >>> > "earliest");
> > > > > >> > >>> >
> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
> > > > > >> "1024");
> > > > > >> > >>> >
> > > > > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
> > > > > >> > >>> >
> > > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
> > > > > >> "500");
> > > > > >> > >>> >            return new
> KafkaConsumer<String,String>(props);
> > > > > >> > >>> >   }
> > > > > >> > >>> >
> > > > > >> > >>> > *Code explanation:Fetching records is taking time for
> the
> > > > first
> > > > > >> poll.*
> > > > > >> > >>> > Poll Records Count: 500 diff: 1284
> > > > > >> > >>> > Poll Records Count: 500 diff: 3
> > > > > >> > >>> >
> > > > > >> > >>> > For the first 500 records it took 1284 ms and next 500
> > > records
> > > > > it
> > > > > >> took
> > > > > >> > >>> 4 ms
> > > > > >> > >>> >
> > > > > >> > >>> > *Why this much difference? I would like to improve the
> > > > > >> performance of
> > > > > >> > >>> the
> > > > > >> > >>> > first poll time?*
> > > > > >> > >>> >
> > > > > >> > >>> >
> > > > > >> > >>> > 1) How to fetch first 500 records in less time
> > > > > >> > >>> >
> > > > > >> > >>> > *I am also seeing one strange issue.My kafka topic which
> > has
> > > > one
> > > > > >> > >>> partition
> > > > > >> > >>> > contains some 5 lakh records*.*The starting records take
> > > more
> > > > > >> time to
> > > > > >> > >>> fetch
> > > > > >> > >>> > from kafka.*
> > > > > >> > >>> >
> > > > > >> > >>> > For example if i give "0" as input offset and it is
> taking
> > > > time
> > > > > as
> > > > > >> > >>> below (6
> > > > > >> > >>> > seconds) and not getting 500 records also,it is getting
> > only
> > > > 200
> > > > > >> > >>> records
> > > > > >> > >>> > per poll and taking lot of time...why this is happening
> > and
> > > > how
> > > > > to
> > > > > >> > >>> avoid
> > > > > >> > >>> > this.
> > > > > >> > >>> >
> > > > > >> > >>> > Poll Records Count :292 Time taken :1227 ms
> > > > > >> > >>> > Poll Records Count :292 Time taken :1181 ms
> > > > > >> > >>> > Poll Records Count :296 Time taken:1234 ms
> > > > > >> > >>> > Poll Records Count :292 Time taken:1133 ms
> > > > > >> > >>> >
> > > > > >> > >>> > *If I give an offset as 110999 and it is getting some
> fast
> > > and
> > > > > >> records
> > > > > >> > >>> > getting as 500 each..Why this difference please.*
> > > > > >> > >>> >
> > > > > >> > >>> > Poll Records Count :500 Time taken:1284 ms
> > > > > >> > >>> > Poll Records Count :500 Time taken:3 ms
> > > > > >> > >>> >
> > > > > >> > >>> >
> > > > > >> > >>> >
> > > > > >> > >>> > Please give your suggestion on this.
> > > > > >> > >>> >
> > > > > >> > >>> > Regards,
> > > > > >> > >>> > Giridar
> > > > > >> > >>>
> > > > > >> > >>>
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to