Hi, All of them are from same consumer in each poll.
Before poll we set offset to user input offset and try to consume next 1000 messages. Thanks Giridhar. On Sun, 24 Nov 2024 at 8:40 PM, Chia-Ping Tsai <chia7...@gmail.com> wrote: > hi > > > Poll Records Count :0 diff :2004 > Poll Records Count :500 diff :943 > > Are them from the same consumer in each poll? Or they are based on > different "offset" and separate consumer's poll? > > thanks, > Chia-Ping > > > giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午8:51寫道: > > > Do i need to check any settings in the kafka server level? > > > > On Sun, Nov 24, 2024 at 6:19 PM giri mungi <girimung...@gmail.com> > wrote: > > > > > Hi I have set the below properties as below: > > > > > > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000"); > > > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); > > > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500"); > > > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1750000"); > > > > > > Poll Records Count :0 diff :2004 > > > Poll Records Count :500 diff :943 > > > Poll Records Count :0 diff :2000 > > > Poll Records Count :44 diff :3 > > > Poll Records Count :500 diff :205 > > > Poll Records Count :488 diff :1978 > > > > > > > > > if i set max records as 500 or 1000 then sometimes i am getting count > as > > > "0" and taking max wait ms time which is impacting the overall > response > > > time.. > > > > > > Even though I have mentioned *fetch_min_bytes & max_poll_records iam > > > getting "0" records which is impacting the overall response time.* > > > > > > How to avoid this please. > > > > > > please suggest this. > > > > > > Thanks, > > > Giridar > > > > > > > > > > > > On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai <chia7...@apache.org> > > > wrote: > > > > > >> hi Giridar > > >> > > >> It seems your use case involves random reads, and you expect the > > consumer > > >> to return 1000 records from server at once. Therefore, you could > > increase > > >> the wait time (fetch.max.wait.ms) and fetch size (fetch.min.bytes) to > > >> receive a larger response with as many records as possible. > > >> > > >> A suitable value for fetch.min.bytes might be 'average size of > records * > > >> 1000'. You may also need to increase max.partition.fetch.bytes. For > > >> fetch.max.wait.ms, a value of 1500 ms might match your expectations. > > >> > > >> Best, > > >> Chia-Ping > > >> > > >> > > >> On 2024/11/24 10:55:23 giri mungi wrote: > > >> > Hi Yang, > > >> > > > >> > *This is the old code which is perfectly doing fine and returning > less > > >> than > > >> > 3 seconds for all 1000 records.* > > >> > > > >> > do { > > >> > FetchRequest req = new FetchRequestBuilder().clientId(clientName) > > >> > .addFetch(a_topic, a_partition, readOffset, fetchSize) > > >> > .build(); > > >> > > > >> > FetchResponse fetchResponse = consumer.fetch(req); > > >> > > > >> > > > >> > JSONObject obj = null; > > >> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, > > >> > a_partition); > > >> > > > >> > for (MessageAndOffset messageAndOffset : set) { > > >> > String message = null; > > >> > if (messageAndOffset.offset() < readOffset) { > > >> > log.warn("Found an old offset: {} ,Expecting: {}", > > >> > messageAndOffset.offset(), readOffset); > > >> > continue; > > >> > } > > >> > > > >> > ByteBuffer payload = messageAndOffset.message().payload(); > > >> > > > >> > byte[] bytes = new byte[payload.limit()]; > > >> > payload.get(bytes); > > >> > try { > > >> > message = new String(bytes, "UTF-8"); > > >> > } catch (UnsupportedEncodingException ue) { > > >> > log.warn(ue.getMessage(), ue); > > >> > message = new String(bytes); > > >> > } > > >> > > > >> > obj = new JSONObject(message); > > >> > msglist.add(new JSONObject(message)); > > >> > } > > >> > readOffset = messageAndOffset.nextOffset(); > > >> > > > >> > if (msglist.size() >= Math.round(limit > > >> > / inputReq.getApplicationArea().getReqInfo().size()) > > >> > || (endTime - startTime) >= waitTime) { > > >> > log.info( > > >> > "Wait condition has been met... exiting the fetch loop. recordCount > - > > >> {}, > > >> > time exhausted - {} ms.", > > >> > msglist.size(), (endTime - startTime)); > > >> > end = true; > > >> > } > > >> > > > >> > } while (!end); > > >> > > > >> > > > >> > *The new code is taking 8 seconds.. which is more than double the > > time* > > >> > > > >> > TopicPartition topicPartition = new TopicPartition("Test", 0); > > >> > consumer.seekToBeginning(Collections.singletonList(topicPartition)); > > >> > long kafkaEarliestOffset = consumer.position(topicPartition); > > >> > try (KafkaConsumer < String, String > consumer = > > >> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { > > >> > consumer.partitionsFor(topicName); > > >> > consumer.assign(Collections.singletonList(topicPartition)); > > >> > consumer.seek(topicPartition, readOffset); > > >> > do { > > >> > ConsumerRecords < String, String > records = > > >> > consumer.poll(Duration.ofMillis(1500)); > > >> > } while (!end) > > >> > > > >> > public static KafkaConsumer<String,String> createConsumer(String > > >> > clientName,int fetchSize) { > > >> > Properties props = new Properties(); > > >> > String kafkaBrokerStr = > > >> > Config.getConsumerPropValue("kafkabrokerslist"); > > >> > String groupId = Config.getConsumerPropValue("group.id > "); > > >> > > > >> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > >> > StringDeserializer.class.getName()); > > >> > > > >> > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > >> > StringDeserializer.class.getName()); > > >> > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > > >> > > > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > >> > "false"); > > >> > > props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > >> > "earliest"); > > >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > "1024"); > > >> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > > >> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > "500"); > > >> > return new KafkaConsumer<String,String>(props); > > >> > } > > >> > > > >> > How to improve the performance on this. > > >> > > > >> > On Sun, Nov 24, 2024 at 4:11 PM giri mungi <girimung...@gmail.com> > > >> wrote: > > >> > > > >> > > Hi Yang, > > >> > > > > >> > > Can i get the records from kafka as bytes or compression form so > > that > > >> i > > >> > > will take less time from kafka. > > >> > > > > >> > > I can build messages from those bytes.Is that possible? > > >> > > > > >> > > Can you please give suggestions on this. > > >> > > > > >> > > Thanks, > > >> > > Giridar > > >> > > > > >> > > On Sun, Nov 24, 2024 at 3:50 PM giri mungi <girimung...@gmail.com > > > > >> wrote: > > >> > > > > >> > >> Hi Yang, > > >> > >> > > >> > >> Thanks for your reply. > > >> > >> > > >> > >> Now what should I do to improve my performance?Because the old > > kafka > > >> code > > >> > >> was good in performance > > >> > >> > > >> > >> These are the properties: > > >> > >> > > >> > >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); > > >> > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > >> > >> "false"); > > >> > >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > >> > >> "earliest"); > > >> > >> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); > > >> > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > > >> > >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); > > >> > >> > > >> > >> How to decrease this time ,plz suggest . > > >> > >> > > >> > >> Thanks, > > >> > >> Giridar > > >> > >> > > >> > >> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <yangp...@gmail.com> > > >> wrote: > > >> > >> > > >> > >>> Hi Giridar, > > >> > >>> > > >> > >>> > *Code explanation:Fetching records is taking time for the > first > > >> poll.* > > >> > >>> > Poll Records Count: 500 diff: 1284 > > >> > >>> > Poll Records Count: 500 diff: 3 > > >> > >>> > > > >> > >>> > For the first 500 records it took 1284 ms and next 500 records > > it > > >> took > > >> > >>> 4 ms > > >> > >>> > > > >> > >>> > *Why this much difference? I would like to improve the > > >> performance of > > >> > >>> the > > >> > >>> > first poll time?* > > >> > >>> > > >> > >>> IIRC, a consumer has FetchBuffer (cache). > > >> > >>> If a FetchResponse can return records morn than > > `max.poll.records`, > > >> the > > >> > >>> extra records will be stored in FetchBuffer. > > >> > >>> That is why the second poll can get data in short time. > > >> > >>> IMO, we don’t need to balance each poll time, because that means > > we > > >> > >>> don’t want local cache. > > >> > >>> > > >> > >>> > For example if i give "0" as input offset and it is taking > time > > as > > >> > >>> below (6 > > >> > >>> > seconds) and not getting 500 records also,it is getting only > 200 > > >> > >>> records > > >> > >>> > per poll and taking lot of time...why this is happening and > how > > to > > >> > >>> avoid > > >> > >>> > this. > > >> > >>> > > > >> > >>> > Poll Records Count :292 Time taken :1227 ms > > >> > >>> > Poll Records Count :292 Time taken :1181 ms > > >> > >>> > Poll Records Count :296 Time taken:1234 ms > > >> > >>> > Poll Records Count :292 Time taken:1133 ms > > >> > >>> > > > >> > >>> > *If I give an offset as 110999 and it is getting some fast and > > >> records > > >> > >>> > getting as 500 each..Why this difference please.* > > >> > >>> > > > >> > >>> > Poll Records Count :500 Time taken:1284 ms > > >> > >>> > Poll Records Count :500 Time taken:3 ms > > >> > >>> > > >> > >>> IIUC, a FetchRequest has a limitation from `fetch.max.bytes`. > > >> > >>> If the record size from offset “0” is bigger than from offset > > >> “110999”, > > >> > >>> then a FetchResponse returns less records. > > >> > >>> > > >> > >>> Please correct me if I misunderstood anything. > > >> > >>> > > >> > >>> Thanks, > > >> > >>> PoAn > > >> > >>> > > >> > >>> > On Nov 24, 2024, at 2:09 PM, giri mungi < > girimung...@gmail.com> > > >> wrote: > > >> > >>> > > > >> > >>> > Hi Team, > > >> > >>> > > > >> > >>> > Good day to you. > > >> > >>> > > > >> > >>> > Iam Giridhar.I need your suggestions in kafka > > >> > >>> > performance improvement please. > > >> > >>> > > > >> > >>> > *Scenario is: The user will give the offset as input and based > > on > > >> the > > >> > >>> > offset we need to give the next 1000 messages from kafka topic > > >> and next > > >> > >>> > offset.The kafka topic contains only one partition.* > > >> > >>> > > > >> > >>> > We are trying to migrate from old kafka to new kafka.In the > old > > >> kafka > > >> > >>> we > > >> > >>> > were using code like: > > >> > >>> > > > >> > >>> > *old code(kafka clients 0.8 .1):* > > >> > >>> > > > >> > >>> > FetchRequest req = new > > >> > >>> > FetchRequestBuilder().clientId(clientName).addFetch(a_topic, > > >> > >>> > a_partition, readOffset, fetchSize).build(); > > >> > >>> > FetchResponse fetchResponse = consumer.fetch(req); > > >> > >>> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, > > >> > >>> a_partition); > > >> > >>> > > > >> > >>> > This code is super fast and same we are trying to achieve > using > > >> > >>> > KafkaConsumer API and getting slowness > > >> > >>> > > > >> > >>> > *New kafkaconsumer code is using(kafka clients 3.6 .1)* > > >> > >>> > > > >> > >>> > TopicPartition topicPartition = new TopicPartition("Test", 0); > > >> > >>> > > > >> consumer.seekToBeginning(Collections.singletonList(topicPartition)); > > >> > >>> > long kafkaEarliestOffset = consumer.position(topicPartition); > > >> > >>> > try (KafkaConsumer < String, String > consumer = > > >> > >>> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { > > >> > >>> > consumer.assign(Collections.singletonList(topicPartition)); > > >> > >>> > consumer.seek(topicPartition, readOffset); > > >> > >>> > do { > > >> > >>> > ConsumerRecords < String, String > records = > > >> > >>> > consumer.poll(Duration.ofMillis(1500)); > > >> > >>> > } while (!end) > > >> > >>> > > > >> > >>> > public static KafkaConsumer<String,String> > createConsumer(String > > >> > >>> > clientName,int fetchSize) { > > >> > >>> > Properties props = new Properties(); > > >> > >>> > String kafkaBrokerStr = > > >> > >>> > Config.getConsumerPropValue("kafkabrokerlist"); > > >> > >>> > > > >> > >>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > >> > >>> > StringDeserializer.class.getName()); > > >> > >>> > > > >> > >>> > > > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > >> > >>> > StringDeserializer.class.getName()); > > >> > >>> > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > > ""); > > >> > >>> > > > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > > >> > >>> > "false"); > > >> > >>> > > > >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > > >> > >>> > "earliest"); > > >> > >>> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > > >> "1024"); > > >> > >>> > > > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); > > >> > >>> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, > > >> "500"); > > >> > >>> > return new KafkaConsumer<String,String>(props); > > >> > >>> > } > > >> > >>> > > > >> > >>> > *Code explanation:Fetching records is taking time for the > first > > >> poll.* > > >> > >>> > Poll Records Count: 500 diff: 1284 > > >> > >>> > Poll Records Count: 500 diff: 3 > > >> > >>> > > > >> > >>> > For the first 500 records it took 1284 ms and next 500 records > > it > > >> took > > >> > >>> 4 ms > > >> > >>> > > > >> > >>> > *Why this much difference? I would like to improve the > > >> performance of > > >> > >>> the > > >> > >>> > first poll time?* > > >> > >>> > > > >> > >>> > > > >> > >>> > 1) How to fetch first 500 records in less time > > >> > >>> > > > >> > >>> > *I am also seeing one strange issue.My kafka topic which has > one > > >> > >>> partition > > >> > >>> > contains some 5 lakh records*.*The starting records take more > > >> time to > > >> > >>> fetch > > >> > >>> > from kafka.* > > >> > >>> > > > >> > >>> > For example if i give "0" as input offset and it is taking > time > > as > > >> > >>> below (6 > > >> > >>> > seconds) and not getting 500 records also,it is getting only > 200 > > >> > >>> records > > >> > >>> > per poll and taking lot of time...why this is happening and > how > > to > > >> > >>> avoid > > >> > >>> > this. > > >> > >>> > > > >> > >>> > Poll Records Count :292 Time taken :1227 ms > > >> > >>> > Poll Records Count :292 Time taken :1181 ms > > >> > >>> > Poll Records Count :296 Time taken:1234 ms > > >> > >>> > Poll Records Count :292 Time taken:1133 ms > > >> > >>> > > > >> > >>> > *If I give an offset as 110999 and it is getting some fast and > > >> records > > >> > >>> > getting as 500 each..Why this difference please.* > > >> > >>> > > > >> > >>> > Poll Records Count :500 Time taken:1284 ms > > >> > >>> > Poll Records Count :500 Time taken:3 ms > > >> > >>> > > > >> > >>> > > > >> > >>> > > > >> > >>> > Please give your suggestion on this. > > >> > >>> > > > >> > >>> > Regards, > > >> > >>> > Giridar > > >> > >>> > > >> > >>> > > >> > > > >> > > > > > >