Do i need to check any settings in the kafka server level? On Sun, Nov 24, 2024 at 6:19 PM giri mungi <girimung...@gmail.com> wrote:
> Hi I have set the below properties as below: > > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000"); > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500"); > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1750000"); > > Poll Records Count :0 diff :2004 > Poll Records Count :500 diff :943 > Poll Records Count :0 diff :2000 > Poll Records Count :44 diff :3 > Poll Records Count :500 diff :205 > Poll Records Count :488 diff :1978 > > > if i set max records as 500 or 1000 then sometimes i am getting count as > "0" and taking max wait ms time which is impacting the overall response > time.. > > Even though I have mentioned *fetch_min_bytes & max_poll_records iam > getting "0" records which is impacting the overall response time.* > > How to avoid this please. > > please suggest this. > > Thanks, > Giridar > > > > On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai <chia7...@apache.org> > wrote: > >> hi Giridar >> >> It seems your use case involves random reads, and you expect the consumer >> to return 1000 records from server at once. Therefore, you could increase >> the wait time (fetch.max.wait.ms) and fetch size (fetch.min.bytes) to >> receive a larger response with as many records as possible. >> >> A suitable value for fetch.min.bytes might be 'average size of records * >> 1000'. You may also need to increase max.partition.fetch.bytes. For >> fetch.max.wait.ms, a value of 1500 ms might match your expectations. >> >> Best, >> Chia-Ping >> >> >> On 2024/11/24 10:55:23 giri mungi wrote: >> > Hi Yang, >> > >> > *This is the old code which is perfectly doing fine and returning less >> than >> > 3 seconds for all 1000 records.* >> > >> > do { >> > FetchRequest req = new FetchRequestBuilder().clientId(clientName) >> > .addFetch(a_topic, a_partition, readOffset, fetchSize) >> > .build(); >> > >> > FetchResponse fetchResponse = consumer.fetch(req); >> > >> > >> > JSONObject obj = null; >> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, >> > a_partition); >> > >> > for (MessageAndOffset messageAndOffset : set) { >> > String message = null; >> > if (messageAndOffset.offset() < readOffset) { >> > log.warn("Found an old offset: {} ,Expecting: {}", >> > messageAndOffset.offset(), readOffset); >> > continue; >> > } >> > >> > ByteBuffer payload = messageAndOffset.message().payload(); >> > >> > byte[] bytes = new byte[payload.limit()]; >> > payload.get(bytes); >> > try { >> > message = new String(bytes, "UTF-8"); >> > } catch (UnsupportedEncodingException ue) { >> > log.warn(ue.getMessage(), ue); >> > message = new String(bytes); >> > } >> > >> > obj = new JSONObject(message); >> > msglist.add(new JSONObject(message)); >> > } >> > readOffset = messageAndOffset.nextOffset(); >> > >> > if (msglist.size() >= Math.round(limit >> > / inputReq.getApplicationArea().getReqInfo().size()) >> > || (endTime - startTime) >= waitTime) { >> > log.info( >> > "Wait condition has been met... exiting the fetch loop. recordCount - >> {}, >> > time exhausted - {} ms.", >> > msglist.size(), (endTime - startTime)); >> > end = true; >> > } >> > >> > } while (!end); >> > >> > >> > *The new code is taking 8 seconds.. which is more than double the time* >> > >> > TopicPartition topicPartition = new TopicPartition("Test", 0); >> > consumer.seekToBeginning(Collections.singletonList(topicPartition)); >> > long kafkaEarliestOffset = consumer.position(topicPartition); >> > try (KafkaConsumer < String, String > consumer = >> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { >> > consumer.partitionsFor(topicName); >> > consumer.assign(Collections.singletonList(topicPartition)); >> > consumer.seek(topicPartition, readOffset); >> > do { >> > ConsumerRecords < String, String > records = >> > consumer.poll(Duration.ofMillis(1500)); >> > } while (!end) >> > >> > public static KafkaConsumer<String,String> createConsumer(String >> > clientName,int fetchSize) { >> > Properties props = new Properties(); >> > String kafkaBrokerStr = >> > Config.getConsumerPropValue("kafkabrokerslist"); >> > String groupId = Config.getConsumerPropValue("group.id"); >> > >> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> > StringDeserializer.class.getName()); >> > >> > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> > StringDeserializer.class.getName()); >> > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); >> > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >> > "false"); >> > props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >> > "earliest"); >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); >> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); >> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); >> > return new KafkaConsumer<String,String>(props); >> > } >> > >> > How to improve the performance on this. >> > >> > On Sun, Nov 24, 2024 at 4:11 PM giri mungi <girimung...@gmail.com> >> wrote: >> > >> > > Hi Yang, >> > > >> > > Can i get the records from kafka as bytes or compression form so that >> i >> > > will take less time from kafka. >> > > >> > > I can build messages from those bytes.Is that possible? >> > > >> > > Can you please give suggestions on this. >> > > >> > > Thanks, >> > > Giridar >> > > >> > > On Sun, Nov 24, 2024 at 3:50 PM giri mungi <girimung...@gmail.com> >> wrote: >> > > >> > >> Hi Yang, >> > >> >> > >> Thanks for your reply. >> > >> >> > >> Now what should I do to improve my performance?Because the old kafka >> code >> > >> was good in performance >> > >> >> > >> These are the properties: >> > >> >> > >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); >> > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >> > >> "false"); >> > >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >> > >> "earliest"); >> > >> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); >> > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); >> > >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); >> > >> >> > >> How to decrease this time ,plz suggest . >> > >> >> > >> Thanks, >> > >> Giridar >> > >> >> > >> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang <yangp...@gmail.com> >> wrote: >> > >> >> > >>> Hi Giridar, >> > >>> >> > >>> > *Code explanation:Fetching records is taking time for the first >> poll.* >> > >>> > Poll Records Count: 500 diff: 1284 >> > >>> > Poll Records Count: 500 diff: 3 >> > >>> > >> > >>> > For the first 500 records it took 1284 ms and next 500 records it >> took >> > >>> 4 ms >> > >>> > >> > >>> > *Why this much difference? I would like to improve the >> performance of >> > >>> the >> > >>> > first poll time?* >> > >>> >> > >>> IIRC, a consumer has FetchBuffer (cache). >> > >>> If a FetchResponse can return records morn than `max.poll.records`, >> the >> > >>> extra records will be stored in FetchBuffer. >> > >>> That is why the second poll can get data in short time. >> > >>> IMO, we don’t need to balance each poll time, because that means we >> > >>> don’t want local cache. >> > >>> >> > >>> > For example if i give "0" as input offset and it is taking time as >> > >>> below (6 >> > >>> > seconds) and not getting 500 records also,it is getting only 200 >> > >>> records >> > >>> > per poll and taking lot of time...why this is happening and how to >> > >>> avoid >> > >>> > this. >> > >>> > >> > >>> > Poll Records Count :292 Time taken :1227 ms >> > >>> > Poll Records Count :292 Time taken :1181 ms >> > >>> > Poll Records Count :296 Time taken:1234 ms >> > >>> > Poll Records Count :292 Time taken:1133 ms >> > >>> > >> > >>> > *If I give an offset as 110999 and it is getting some fast and >> records >> > >>> > getting as 500 each..Why this difference please.* >> > >>> > >> > >>> > Poll Records Count :500 Time taken:1284 ms >> > >>> > Poll Records Count :500 Time taken:3 ms >> > >>> >> > >>> IIUC, a FetchRequest has a limitation from `fetch.max.bytes`. >> > >>> If the record size from offset “0” is bigger than from offset >> “110999”, >> > >>> then a FetchResponse returns less records. >> > >>> >> > >>> Please correct me if I misunderstood anything. >> > >>> >> > >>> Thanks, >> > >>> PoAn >> > >>> >> > >>> > On Nov 24, 2024, at 2:09 PM, giri mungi <girimung...@gmail.com> >> wrote: >> > >>> > >> > >>> > Hi Team, >> > >>> > >> > >>> > Good day to you. >> > >>> > >> > >>> > Iam Giridhar.I need your suggestions in kafka >> > >>> > performance improvement please. >> > >>> > >> > >>> > *Scenario is: The user will give the offset as input and based on >> the >> > >>> > offset we need to give the next 1000 messages from kafka topic >> and next >> > >>> > offset.The kafka topic contains only one partition.* >> > >>> > >> > >>> > We are trying to migrate from old kafka to new kafka.In the old >> kafka >> > >>> we >> > >>> > were using code like: >> > >>> > >> > >>> > *old code(kafka clients 0.8 .1):* >> > >>> > >> > >>> > FetchRequest req = new >> > >>> > FetchRequestBuilder().clientId(clientName).addFetch(a_topic, >> > >>> > a_partition, readOffset, fetchSize).build(); >> > >>> > FetchResponse fetchResponse = consumer.fetch(req); >> > >>> > ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, >> > >>> a_partition); >> > >>> > >> > >>> > This code is super fast and same we are trying to achieve using >> > >>> > KafkaConsumer API and getting slowness >> > >>> > >> > >>> > *New kafkaconsumer code is using(kafka clients 3.6 .1)* >> > >>> > >> > >>> > TopicPartition topicPartition = new TopicPartition("Test", 0); >> > >>> > >> consumer.seekToBeginning(Collections.singletonList(topicPartition)); >> > >>> > long kafkaEarliestOffset = consumer.position(topicPartition); >> > >>> > try (KafkaConsumer < String, String > consumer = >> > >>> > KafkaConsumerFactory.createConsumer(clientName, fetchSize)) { >> > >>> > consumer.assign(Collections.singletonList(topicPartition)); >> > >>> > consumer.seek(topicPartition, readOffset); >> > >>> > do { >> > >>> > ConsumerRecords < String, String > records = >> > >>> > consumer.poll(Duration.ofMillis(1500)); >> > >>> > } while (!end) >> > >>> > >> > >>> > public static KafkaConsumer<String,String> createConsumer(String >> > >>> > clientName,int fetchSize) { >> > >>> > Properties props = new Properties(); >> > >>> > String kafkaBrokerStr = >> > >>> > Config.getConsumerPropValue("kafkabrokerlist"); >> > >>> > >> > >>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> > >>> > StringDeserializer.class.getName()); >> > >>> > >> > >>> > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> > >>> > StringDeserializer.class.getName()); >> > >>> > props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); >> > >>> > >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >> > >>> > "false"); >> > >>> > >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >> > >>> > "earliest"); >> > >>> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, >> "1024"); >> > >>> > >> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); >> > >>> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, >> "500"); >> > >>> > return new KafkaConsumer<String,String>(props); >> > >>> > } >> > >>> > >> > >>> > *Code explanation:Fetching records is taking time for the first >> poll.* >> > >>> > Poll Records Count: 500 diff: 1284 >> > >>> > Poll Records Count: 500 diff: 3 >> > >>> > >> > >>> > For the first 500 records it took 1284 ms and next 500 records it >> took >> > >>> 4 ms >> > >>> > >> > >>> > *Why this much difference? I would like to improve the >> performance of >> > >>> the >> > >>> > first poll time?* >> > >>> > >> > >>> > >> > >>> > 1) How to fetch first 500 records in less time >> > >>> > >> > >>> > *I am also seeing one strange issue.My kafka topic which has one >> > >>> partition >> > >>> > contains some 5 lakh records*.*The starting records take more >> time to >> > >>> fetch >> > >>> > from kafka.* >> > >>> > >> > >>> > For example if i give "0" as input offset and it is taking time as >> > >>> below (6 >> > >>> > seconds) and not getting 500 records also,it is getting only 200 >> > >>> records >> > >>> > per poll and taking lot of time...why this is happening and how to >> > >>> avoid >> > >>> > this. >> > >>> > >> > >>> > Poll Records Count :292 Time taken :1227 ms >> > >>> > Poll Records Count :292 Time taken :1181 ms >> > >>> > Poll Records Count :296 Time taken:1234 ms >> > >>> > Poll Records Count :292 Time taken:1133 ms >> > >>> > >> > >>> > *If I give an offset as 110999 and it is getting some fast and >> records >> > >>> > getting as 500 each..Why this difference please.* >> > >>> > >> > >>> > Poll Records Count :500 Time taken:1284 ms >> > >>> > Poll Records Count :500 Time taken:3 ms >> > >>> > >> > >>> > >> > >>> > >> > >>> > Please give your suggestion on this. >> > >>> > >> > >>> > Regards, >> > >>> > Giridar >> > >>> >> > >>> >> > >> >