hi

> Poll Records Count :0    diff :2004
Poll Records Count :500  diff :943

Are them from the same consumer in each poll? Or they are based on
different "offset" and separate consumer's poll?

thanks,
Chia-Ping


giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午8:51寫道:

> Do i need to check any settings in the kafka server level?
>
> On Sun, Nov 24, 2024 at 6:19 PM giri mungi <girimung...@gmail.com> wrote:
>
> > Hi I have set the below properties as below:
> >
> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000");
> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500");
> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1750000");
> >
> > Poll Records Count :0    diff :2004
> > Poll Records Count :500  diff :943
> > Poll Records Count :0    diff :2000
> > Poll Records Count :44   diff :3
> > Poll Records Count :500  diff :205
> > Poll Records Count :488  diff :1978
> >
> >
> > if i set max records as 500 or 1000 then sometimes i am getting count as
> > "0"  and taking max wait ms time which is impacting the overall response
> > time..
> >
> > Even though I have mentioned *fetch_min_bytes & max_poll_records iam
> > getting  "0" records  which is impacting the overall response time.*
> >
> > How to avoid this please.
> >
> > please suggest this.
> >
> > Thanks,
> > Giridar
> >
> >
> >
> > On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai <chia7...@apache.org>
> > wrote:
> >
> >> hi Giridar
> >>
> >> It seems your use case involves random reads, and you expect the
> consumer
> >> to return 1000 records from server at once. Therefore, you could
> increase
> >> the wait time (fetch.max.wait.ms) and fetch size (fetch.min.bytes) to
> >> receive a larger response with as many records as possible.
> >>
> >> A suitable value for fetch.min.bytes might be 'average size of records *
> >> 1000'. You may also need to increase max.partition.fetch.bytes. For
> >> fetch.max.wait.ms, a value of 1500 ms might match your expectations.
> >>
> >> Best,
> >> Chia-Ping
> >>
> >>
> >> On 2024/11/24 10:55:23 giri mungi wrote:
> >> > Hi Yang,
> >> >
> >> > *This is the old code which is perfectly doing fine and returning less
> >> than
> >> > 3 seconds for all 1000 records.*
> >> >
> >> > do {
> >> > FetchRequest req = new FetchRequestBuilder().clientId(clientName)
> >> > .addFetch(a_topic, a_partition, readOffset, fetchSize)
> >> > .build();
> >> >
> >> > FetchResponse fetchResponse = consumer.fetch(req);
> >> >
> >> >
> >> > JSONObject obj = null;
> >> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
> >> > a_partition);
> >> >
> >> > for (MessageAndOffset messageAndOffset : set) {
> >> > String message = null;
> >> > if (messageAndOffset.offset() < readOffset) {
> >> > log.warn("Found an old offset: {} ,Expecting: {}",
> >> > messageAndOffset.offset(), readOffset);
> >> > continue;
> >> > }
> >> >
> >> > ByteBuffer payload = messageAndOffset.message().payload();
> >> >
> >> > byte[] bytes = new byte[payload.limit()];
> >> > payload.get(bytes);
> >> > try {
> >> > message = new String(bytes, "UTF-8");
> >> > } catch (UnsupportedEncodingException ue) {
> >> > log.warn(ue.getMessage(), ue);
> >> > message = new String(bytes);
> >> > }
> >> >
> >> > obj = new JSONObject(message);
> >> > msglist.add(new JSONObject(message));
> >> > }
> >> > readOffset = messageAndOffset.nextOffset();
> >> >
> >> > if (msglist.size() >= Math.round(limit
> >> > / inputReq.getApplicationArea().getReqInfo().size())
> >> > || (endTime - startTime) >= waitTime) {
> >> > log.info(
> >> > "Wait condition has been met... exiting the fetch loop. recordCount -
> >> {},
> >> > time exhausted - {} ms.",
> >> > msglist.size(), (endTime - startTime));
> >> > end = true;
> >> > }
> >> >
> >> > }  while (!end);
> >> >
> >> >
> >> > *The new code is taking 8 seconds.. which is more than double the
> time*
> >> >
> >> > TopicPartition topicPartition = new TopicPartition("Test", 0);
> >> > consumer.seekToBeginning(Collections.singletonList(topicPartition));
> >> > long kafkaEarliestOffset = consumer.position(topicPartition);
> >> > try (KafkaConsumer < String, String > consumer =
> >> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) {
> >> >     consumer.partitionsFor(topicName);
> >> >     consumer.assign(Collections.singletonList(topicPartition));
> >> >     consumer.seek(topicPartition, readOffset);
> >> >     do {
> >> >         ConsumerRecords < String, String > records =
> >> >             consumer.poll(Duration.ofMillis(1500));
> >> >     } while (!end)
> >> >
> >> > public static KafkaConsumer<String,String> createConsumer(String
> >> > clientName,int fetchSize) {
> >> >             Properties props = new Properties();
> >> >             String kafkaBrokerStr =
> >> > Config.getConsumerPropValue("kafkabrokerslist");
> >> >             String groupId = Config.getConsumerPropValue("group.id");
> >> >
> >>  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> >> > StringDeserializer.class.getName());
> >> >
> >> > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> >> > StringDeserializer.class.getName());
> >> >             props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
> >> >
>  props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> >> > "false");
> >> >             props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> >> > "earliest");
> >> >             props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
> >> >             props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
> >> >             props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
> >> >             return new KafkaConsumer<String,String>(props);
> >> >    }
> >> >
> >> > How to improve the performance on this.
> >> >
> >> > On Sun, Nov 24, 2024 at 4:11 PM giri mungi <girimung...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Yang,
> >> > >
> >> > > Can i get the records from kafka as bytes or compression form so
> that
> >> i
> >> > > will take less time from kafka.
> >> > >
> >> > > I can build messages from those bytes.Is that possible?
> >> > >
> >> > > Can you please give suggestions on this.
> >> > >
> >> > > Thanks,
> >> > > Giridar
> >> > >
> >> > > On Sun, Nov 24, 2024 at 3:50 PM giri mungi <girimung...@gmail.com>
> >> wrote:
> >> > >
> >> > >> Hi Yang,
> >> > >>
> >> > >> Thanks for your reply.
> >> > >>
> >> > >> Now what should I do to improve my performance?Because the old
> kafka
> >> code
> >> > >> was good in performance
> >> > >>
> >> > >> These are the properties:
> >> > >>
> >> > >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
> >> > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> >> > >> "false");
> >> > >>  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> >> > >>  "earliest");
> >> > >>  props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
> >> > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
> >> > >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
> >> > >>
> >> > >> How to decrease this time ,plz suggest .
> >> > >>
> >> > >> Thanks,
> >> > >> Giridar
> >> > >>
> >> > >> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <yangp...@gmail.com>
> >> wrote:
> >> > >>
> >> > >>> Hi Giridar,
> >> > >>>
> >> > >>> > *Code explanation:Fetching records is taking time for the first
> >> poll.*
> >> > >>> > Poll Records Count: 500 diff: 1284
> >> > >>> > Poll Records Count: 500 diff: 3
> >> > >>> >
> >> > >>> > For the first 500 records it took 1284 ms and next 500 records
> it
> >> took
> >> > >>> 4 ms
> >> > >>> >
> >> > >>> > *Why this much difference? I would like to improve the
> >> performance of
> >> > >>> the
> >> > >>> > first poll time?*
> >> > >>>
> >> > >>> IIRC, a consumer has FetchBuffer (cache).
> >> > >>> If a FetchResponse can return records morn than
> `max.poll.records`,
> >> the
> >> > >>> extra records will be stored in FetchBuffer.
> >> > >>> That is why the second poll can get data in short time.
> >> > >>> IMO, we don’t need to balance each poll time, because that means
> we
> >> > >>> don’t want local cache.
> >> > >>>
> >> > >>> > For example if i give "0" as input offset and it is taking time
> as
> >> > >>> below (6
> >> > >>> > seconds) and not getting 500 records also,it is getting only 200
> >> > >>> records
> >> > >>> > per poll and taking lot of time...why this is happening and how
> to
> >> > >>> avoid
> >> > >>> > this.
> >> > >>> >
> >> > >>> > Poll Records Count :292 Time taken :1227 ms
> >> > >>> > Poll Records Count :292 Time taken :1181 ms
> >> > >>> > Poll Records Count :296 Time taken:1234 ms
> >> > >>> > Poll Records Count :292 Time taken:1133 ms
> >> > >>> >
> >> > >>> > *If I give an offset as 110999 and it is getting some fast and
> >> records
> >> > >>> > getting as 500 each..Why this difference please.*
> >> > >>> >
> >> > >>> > Poll Records Count :500 Time taken:1284 ms
> >> > >>> > Poll Records Count :500 Time taken:3 ms
> >> > >>>
> >> > >>> IIUC, a FetchRequest has a limitation from `fetch.max.bytes`.
> >> > >>> If the record size from offset “0” is bigger than from offset
> >> “110999”,
> >> > >>> then a FetchResponse returns less records.
> >> > >>>
> >> > >>> Please correct me if I misunderstood anything.
> >> > >>>
> >> > >>> Thanks,
> >> > >>> PoAn
> >> > >>>
> >> > >>> > On Nov 24, 2024, at 2:09 PM, giri mungi <girimung...@gmail.com>
> >> wrote:
> >> > >>> >
> >> > >>> > Hi Team,
> >> > >>> >
> >> > >>> > Good day to you.
> >> > >>> >
> >> > >>> > Iam Giridhar.I need your suggestions in kafka
> >> > >>> > performance improvement please.
> >> > >>> >
> >> > >>> > *Scenario is: The user will give the offset as input and based
> on
> >> the
> >> > >>> > offset we need to give the next 1000 messages from kafka topic
> >> and next
> >> > >>> > offset.The kafka topic contains only one partition.*
> >> > >>> >
> >> > >>> > We are trying to migrate from old kafka to new kafka.In the old
> >> kafka
> >> > >>> we
> >> > >>> > were using code like:
> >> > >>> >
> >> > >>> > *old code(kafka clients 0.8 .1):*
> >> > >>> >
> >> > >>> > FetchRequest req = new
> >> > >>> > FetchRequestBuilder().clientId(clientName).addFetch(a_topic,
> >> > >>> >        a_partition, readOffset, fetchSize).build();
> >> > >>> > FetchResponse fetchResponse = consumer.fetch(req);
> >> > >>> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
> >> > >>> a_partition);
> >> > >>> >
> >> > >>> > This code is super fast and same we are trying to achieve using
> >> > >>> > KafkaConsumer API and getting slowness
> >> > >>> >
> >> > >>> > *New kafkaconsumer code is using(kafka clients 3.6 .1)*
> >> > >>> >
> >> > >>> > TopicPartition topicPartition = new TopicPartition("Test", 0);
> >> > >>> >
> >> consumer.seekToBeginning(Collections.singletonList(topicPartition));
> >> > >>> > long kafkaEarliestOffset = consumer.position(topicPartition);
> >> > >>> > try (KafkaConsumer < String, String > consumer =
> >> > >>> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) {
> >> > >>> >    consumer.assign(Collections.singletonList(topicPartition));
> >> > >>> >    consumer.seek(topicPartition, readOffset);
> >> > >>> >    do {
> >> > >>> >        ConsumerRecords < String, String > records =
> >> > >>> >            consumer.poll(Duration.ofMillis(1500));
> >> > >>> >    } while (!end)
> >> > >>> >
> >> > >>> > public static KafkaConsumer<String,String> createConsumer(String
> >> > >>> > clientName,int fetchSize) {
> >> > >>> >            Properties props = new Properties();
> >> > >>> >            String kafkaBrokerStr =
> >> > >>> > Config.getConsumerPropValue("kafkabrokerlist");
> >> > >>> >
> >> > >>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> >> > >>> > StringDeserializer.class.getName());
> >> > >>> >
> >> > >>> >
> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> >> > >>> > StringDeserializer.class.getName());
> >> > >>> >            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "");
> >> > >>> >
> >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> >> > >>> > "false");
> >> > >>> >
> >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> >> > >>> > "earliest");
> >> > >>> >            props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
> >> "1024");
> >> > >>> >
> >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
> >> > >>> >            props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
> >> "500");
> >> > >>> >            return new KafkaConsumer<String,String>(props);
> >> > >>> >   }
> >> > >>> >
> >> > >>> > *Code explanation:Fetching records is taking time for the first
> >> poll.*
> >> > >>> > Poll Records Count: 500 diff: 1284
> >> > >>> > Poll Records Count: 500 diff: 3
> >> > >>> >
> >> > >>> > For the first 500 records it took 1284 ms and next 500 records
> it
> >> took
> >> > >>> 4 ms
> >> > >>> >
> >> > >>> > *Why this much difference? I would like to improve the
> >> performance of
> >> > >>> the
> >> > >>> > first poll time?*
> >> > >>> >
> >> > >>> >
> >> > >>> > 1) How to fetch first 500 records in less time
> >> > >>> >
> >> > >>> > *I am also seeing one strange issue.My kafka topic which has one
> >> > >>> partition
> >> > >>> > contains some 5 lakh records*.*The starting records take more
> >> time to
> >> > >>> fetch
> >> > >>> > from kafka.*
> >> > >>> >
> >> > >>> > For example if i give "0" as input offset and it is taking time
> as
> >> > >>> below (6
> >> > >>> > seconds) and not getting 500 records also,it is getting only 200
> >> > >>> records
> >> > >>> > per poll and taking lot of time...why this is happening and how
> to
> >> > >>> avoid
> >> > >>> > this.
> >> > >>> >
> >> > >>> > Poll Records Count :292 Time taken :1227 ms
> >> > >>> > Poll Records Count :292 Time taken :1181 ms
> >> > >>> > Poll Records Count :296 Time taken:1234 ms
> >> > >>> > Poll Records Count :292 Time taken:1133 ms
> >> > >>> >
> >> > >>> > *If I give an offset as 110999 and it is getting some fast and
> >> records
> >> > >>> > getting as 500 each..Why this difference please.*
> >> > >>> >
> >> > >>> > Poll Records Count :500 Time taken:1284 ms
> >> > >>> > Poll Records Count :500 Time taken:3 ms
> >> > >>> >
> >> > >>> >
> >> > >>> >
> >> > >>> > Please give your suggestion on this.
> >> > >>> >
> >> > >>> > Regards,
> >> > >>> > Giridar
> >> > >>>
> >> > >>>
> >> >
> >>
> >
>

Reply via email to