Hi I have set the below properties as below: props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500"); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1750000");
Poll Records Count :0 diff :2004 Poll Records Count :500 diff :943 Poll Records Count :0 diff :2000 Poll Records Count :44 diff :3 Poll Records Count :500 diff :205 Poll Records Count :488 diff :1978 if i set max records as 500 or 1000 then sometimes i am getting count as "0" and taking max wait ms time which is impacting the overall response time.. Even though I have mentioned *fetch_min_bytes & max_poll_records iam getting "0" records which is impacting the overall response time.* How to avoid this please. please suggest this. Thanks, Giridar On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai <chia7...@apache.org> wrote: > hi Giridar > > It seems your use case involves random reads, and you expect the consumer > to return 1000 records from server at once. Therefore, you could increase > the wait time (fetch.max.wait.ms) and fetch size (fetch.min.bytes) to > receive a larger response with as many records as possible. > > A suitable value for fetch.min.bytes might be 'average size of records * > 1000'. You may also need to increase max.partition.fetch.bytes. For > fetch.max.wait.ms, a value of 1500 ms might match your expectations. > > Best, > Chia-Ping > > > On 2024/11/24 10:55:23 giri mungi wrote: > > Hi Yang, > > > > *This is the old code which is perfectly doing fine and returning less > than > > 3 seconds for all 1000 records.* > > > > do { > > FetchRequest req = new FetchRequestBuilder().clientId(clientName) > > .addFetch(a_topic, a_partition, readOffset, fetchSize) > > .build(); > > > > FetchResponse fetchResponse = consumer.fetch(req); > > > > > > JSONObject obj = null; > > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, > > a_partition); > > > > for (MessageAndOffset messageAndOffset : set) { > > String message = null; > > if (messageAndOffset.offset() < readOffset) { > > log.warn("Found an old offset: {} ,Expecting: {}", > > messageAndOffset.offset(), readOffset); > > continue; > > } > > > > ByteBuffer payload = messageAndOffset.message().payload(); > > > > byte[] bytes = new byte[payload.limit()]; > > payload.get(bytes); > > try { > > message = new String(bytes, "UTF-8"); > > } catch (UnsupportedEncodingException ue) { > > log.warn(ue.getMessage(), ue); > > message = new String(bytes); > > } > > > > obj = new JSONObject(message); > > msglist.add(new JSONObject(message)); > > } > > readOffset = messageAndOffset.nextOffset(); > > > > if (msglist.size() >= Math.round(limit > > / inputReq.getApplicationArea().getReqInfo().size()) > > || (endTime - startTime) >= waitTime) { > > log.info( > > "Wait condition has been met... exiting the fetch loop. recordCount - {}, > > time exhausted - {} ms.", > > msglist.size(), (endTime - startTime)); > > end = true; > > } > > > > } while (!end); > > > > > > *The new code is taking 8 seconds.. which is more than double the time* > > > > TopicPartition topicPartition = new TopicPartition("Test", 0); > > consumer.seekToBeginning(Collections.singletonList(topicPartition)); > > long kafkaEarliestOffset = consumer.position(topicPartition); > > try (KafkaConsumer < String, String > consumer = > > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { > > consumer.partitionsFor(topicName); > > consumer.assign(Collections.singletonList(topicPartition)); > > consumer.seek(topicPartition, readOffset); > > do { > > ConsumerRecords < String, String > records = > > consumer.poll(Duration.ofMillis(1500)); > > } while (!end) > > > > public static KafkaConsumer<String,String> createConsumer(String > > clientName,int fetchSize) { > > Properties props = new Properties(); > > String kafkaBrokerStr = > > Config.getConsumerPropValue("kafkabrokerslist"); > > String groupId = Config.getConsumerPropValue("group.id"); > > > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > StringDeserializer.class.getName()); > > > > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > StringDeserializer.class.getName()); > > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > "false"); > > props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > "earliest"); > > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); > > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); > > return new KafkaConsumer<String,String>(props); > > } > > > > How to improve the performance on this. > > > > On Sun, Nov 24, 2024 at 4:11 PM giri mungi <girimung...@gmail.com> > wrote: > > > > > Hi Yang, > > > > > > Can i get the records from kafka as bytes or compression form so that i > > > will take less time from kafka. > > > > > > I can build messages from those bytes.Is that possible? > > > > > > Can you please give suggestions on this. > > > > > > Thanks, > > > Giridar > > > > > > On Sun, Nov 24, 2024 at 3:50 PM giri mungi <girimung...@gmail.com> > wrote: > > > > > >> Hi Yang, > > >> > > >> Thanks for your reply. > > >> > > >> Now what should I do to improve my performance?Because the old kafka > code > > >> was good in performance > > >> > > >> These are the properties: > > >> > > >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > >> "false"); > > >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > >> "earliest"); > > >> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); > > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > > >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); > > >> > > >> How to decrease this time ,plz suggest . > > >> > > >> Thanks, > > >> Giridar > > >> > > >> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <yangp...@gmail.com> wrote: > > >> > > >>> Hi Giridar, > > >>> > > >>> > *Code explanation:Fetching records is taking time for the first > poll.* > > >>> > Poll Records Count: 500 diff: 1284 > > >>> > Poll Records Count: 500 diff: 3 > > >>> > > > >>> > For the first 500 records it took 1284 ms and next 500 records it > took > > >>> 4 ms > > >>> > > > >>> > *Why this much difference? I would like to improve the performance > of > > >>> the > > >>> > first poll time?* > > >>> > > >>> IIRC, a consumer has FetchBuffer (cache). > > >>> If a FetchResponse can return records morn than `max.poll.records`, > the > > >>> extra records will be stored in FetchBuffer. > > >>> That is why the second poll can get data in short time. > > >>> IMO, we don’t need to balance each poll time, because that means we > > >>> don’t want local cache. > > >>> > > >>> > For example if i give "0" as input offset and it is taking time as > > >>> below (6 > > >>> > seconds) and not getting 500 records also,it is getting only 200 > > >>> records > > >>> > per poll and taking lot of time...why this is happening and how to > > >>> avoid > > >>> > this. > > >>> > > > >>> > Poll Records Count :292 Time taken :1227 ms > > >>> > Poll Records Count :292 Time taken :1181 ms > > >>> > Poll Records Count :296 Time taken:1234 ms > > >>> > Poll Records Count :292 Time taken:1133 ms > > >>> > > > >>> > *If I give an offset as 110999 and it is getting some fast and > records > > >>> > getting as 500 each..Why this difference please.* > > >>> > > > >>> > Poll Records Count :500 Time taken:1284 ms > > >>> > Poll Records Count :500 Time taken:3 ms > > >>> > > >>> IIUC, a FetchRequest has a limitation from `fetch.max.bytes`. > > >>> If the record size from offset “0” is bigger than from offset > “110999”, > > >>> then a FetchResponse returns less records. > > >>> > > >>> Please correct me if I misunderstood anything. > > >>> > > >>> Thanks, > > >>> PoAn > > >>> > > >>> > On Nov 24, 2024, at 2:09 PM, giri mungi <girimung...@gmail.com> > wrote: > > >>> > > > >>> > Hi Team, > > >>> > > > >>> > Good day to you. > > >>> > > > >>> > Iam Giridhar.I need your suggestions in kafka > > >>> > performance improvement please. > > >>> > > > >>> > *Scenario is: The user will give the offset as input and based on > the > > >>> > offset we need to give the next 1000 messages from kafka topic and > next > > >>> > offset.The kafka topic contains only one partition.* > > >>> > > > >>> > We are trying to migrate from old kafka to new kafka.In the old > kafka > > >>> we > > >>> > were using code like: > > >>> > > > >>> > *old code(kafka clients 0.8 .1):* > > >>> > > > >>> > FetchRequest req = new > > >>> > FetchRequestBuilder().clientId(clientName).addFetch(a_topic, > > >>> > a_partition, readOffset, fetchSize).build(); > > >>> > FetchResponse fetchResponse = consumer.fetch(req); > > >>> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, > > >>> a_partition); > > >>> > > > >>> > This code is super fast and same we are trying to achieve using > > >>> > KafkaConsumer API and getting slowness > > >>> > > > >>> > *New kafkaconsumer code is using(kafka clients 3.6 .1)* > > >>> > > > >>> > TopicPartition topicPartition = new TopicPartition("Test", 0); > > >>> > > consumer.seekToBeginning(Collections.singletonList(topicPartition)); > > >>> > long kafkaEarliestOffset = consumer.position(topicPartition); > > >>> > try (KafkaConsumer < String, String > consumer = > > >>> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { > > >>> > consumer.assign(Collections.singletonList(topicPartition)); > > >>> > consumer.seek(topicPartition, readOffset); > > >>> > do { > > >>> > ConsumerRecords < String, String > records = > > >>> > consumer.poll(Duration.ofMillis(1500)); > > >>> > } while (!end) > > >>> > > > >>> > public static KafkaConsumer<String,String> createConsumer(String > > >>> > clientName,int fetchSize) { > > >>> > Properties props = new Properties(); > > >>> > String kafkaBrokerStr = > > >>> > Config.getConsumerPropValue("kafkabrokerlist"); > > >>> > > > >>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > >>> > StringDeserializer.class.getName()); > > >>> > > > >>> > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > >>> > StringDeserializer.class.getName()); > > >>> > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > > >>> > > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > >>> > "false"); > > >>> > > props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > >>> > "earliest"); > > >>> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > "1024"); > > >>> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > > >>> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > "500"); > > >>> > return new KafkaConsumer<String,String>(props); > > >>> > } > > >>> > > > >>> > *Code explanation:Fetching records is taking time for the first > poll.* > > >>> > Poll Records Count: 500 diff: 1284 > > >>> > Poll Records Count: 500 diff: 3 > > >>> > > > >>> > For the first 500 records it took 1284 ms and next 500 records it > took > > >>> 4 ms > > >>> > > > >>> > *Why this much difference? I would like to improve the performance > of > > >>> the > > >>> > first poll time?* > > >>> > > > >>> > > > >>> > 1) How to fetch first 500 records in less time > > >>> > > > >>> > *I am also seeing one strange issue.My kafka topic which has one > > >>> partition > > >>> > contains some 5 lakh records*.*The starting records take more time > to > > >>> fetch > > >>> > from kafka.* > > >>> > > > >>> > For example if i give "0" as input offset and it is taking time as > > >>> below (6 > > >>> > seconds) and not getting 500 records also,it is getting only 200 > > >>> records > > >>> > per poll and taking lot of time...why this is happening and how to > > >>> avoid > > >>> > this. > > >>> > > > >>> > Poll Records Count :292 Time taken :1227 ms > > >>> > Poll Records Count :292 Time taken :1181 ms > > >>> > Poll Records Count :296 Time taken:1234 ms > > >>> > Poll Records Count :292 Time taken:1133 ms > > >>> > > > >>> > *If I give an offset as 110999 and it is getting some fast and > records > > >>> > getting as 500 each..Why this difference please.* > > >>> > > > >>> > Poll Records Count :500 Time taken:1284 ms > > >>> > Poll Records Count :500 Time taken:3 ms > > >>> > > > >>> > > > >>> > > > >>> > Please give your suggestion on this. > > >>> > > > >>> > Regards, > > >>> > Giridar > > >>> > > >>> > > >