Hi all, Please give suggestion on this.please help ,Iam stuck with this.
Thanks Giri On Mon, 25 Nov 2024 at 9:22 AM, giri mungi <girimung...@gmail.com> wrote: > Hi Ping, > > 1) Yes the records are already existent. > 2) Yeah the Kafka broker also upgraded to 3.6 version. > That’s why we are upgrading the client code also. > 3) Do I need to compare any settings between old broker and new broker? > > Please help. > > Thanks > Giri > > On Mon, 25 Nov 2024 at 6:45 AM, Chia-Ping Tsai <chia7...@gmail.com> wrote: > >> hi >> >> 1) The records you want to read are already existent, right? >> >> 2) Are all your code changes solely focused on upgrading the client code >> from version 0.8 to 3.6? Or does the broker get upgraded as well? >> >> >> Thanks, >> Chia-Ping >> >> > giri mungi <girimung...@gmail.com> 於 2024年11月25日 凌晨1:27 寫道: >> > >> > Hi Ping, >> > >> > My bad,commitSync is not required.we can ignore that. >> > >> > Iam calculating diff as below : >> > >> > long stime = Calendar.getInstance().getTimeInMillis(); >> > ConsumerRecords <String,String> records = >> > consumer.poll(Duration.ofMillis(2000)); >> > long etime = Calendar.getInstance().getTimeInMillis(); >> > log.info("Poll Records Count :{} Diff >> :{}",records.count(),(etime-stime)); >> > >> > Thanks, >> > Giri >> > >> >> On Sun, Nov 24, 2024 at 10:48 PM Chia-Ping Tsai <chia7...@gmail.com> >> wrote: >> >> >> >> hi Giri >> >> >> >> 1. Why do you call `commitSync`? it seems your application does not >> use the >> >> consumer group as your use case is to random read, right? >> >> 2. how do you calculate "diff"? >> >> >> >> giri mungi <girimung...@gmail.com> 於 2024年11月25日 週一 上午12:50寫道: >> >> >> >>> Hi Ping, >> >>> >> >>> Please find the details below: >> >>> >> >>> 1) Kafka broker version is 3.6.1 >> >>> >> >>> 2) Logic Explanation: >> >>> >> >>> Polls messages from Kafka using >> consumer.poll(Duration.ofMillis(2000)). >> >>> >> >>> *Exit Conditions:* The loop exits when Message Limit > 1000 is >> reached: >> >>> Then the end flag is true and the loop will exit. >> >>> >> >>> >> >>> boolean end = false; >> >>> consumer.seek(topicPartition, readOffset); >> >>> do { >> >>> JSONObject obj = null; >> >>> long stime = Calendar.getInstance().getTimeInMillis(); >> >>> ConsumerRecords < String, String > records = >> >>> consumer.poll(Duration.ofMillis(2000)); >> >>> for (ConsumerRecord< String, String > consumerRecord: records) { >> >>> String message = consumerRecord.value(); >> >>> obj = new JSONObject(message); >> >>> msglist.add(obj); >> >>> } >> >>> } >> >>> if (msglist.size() >=1000) { >> >>> end = true; >> >>> consumer.commitSync(); >> >>> } >> >>> } while (!end); >> >>> >> >>> >> >>> Thanks, >> >>> Giri >> >>> >> >>> On Sun, Nov 24, 2024 at 9:23 PM Chia-Ping Tsai <chia7...@gmail.com> >> >> wrote: >> >>> >> >>>> hi >> >>>> >> >>>> 1) could you share the broker version to us? >> >>>> 2) could you explain how the sample code works? what is the "end"? >> >>>> >> >>>> ``` >> >>>> do { >> >>>> ConsumerRecords < String, String > records = >> >>>> consumer.poll(Duration.ofMillis(1500)); >> >>>> } while (!end) >> >>>> ``` >> >>>> >> >>>> thanks, >> >>>> Chia-Ping >> >>>> >> >>>> >> >>>> giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午11:15寫道: >> >>>> >> >>>>> Hi, >> >>>>> >> >>>>> All of them are from same consumer in each poll. >> >>>>> >> >>>>> Before poll we set offset to user input offset and try to consume >> >> next >> >>>> 1000 >> >>>>> messages. >> >>>>> >> >>>>> Thanks >> >>>>> Giridhar. >> >>>>> >> >>>>> On Sun, 24 Nov 2024 at 8:40 PM, Chia-Ping Tsai <chia7...@gmail.com> >> >>>> wrote: >> >>>>> >> >>>>>> hi >> >>>>>> >> >>>>>>> Poll Records Count :0 diff :2004 >> >>>>>> Poll Records Count :500 diff :943 >> >>>>>> >> >>>>>> Are them from the same consumer in each poll? Or they are based on >> >>>>>> different "offset" and separate consumer's poll? >> >>>>>> >> >>>>>> thanks, >> >>>>>> Chia-Ping >> >>>>>> >> >>>>>> >> >>>>>> giri mungi <girimung...@gmail.com> 於 2024年11月24日 週日 下午8:51寫道: >> >>>>>> >> >>>>>>> Do i need to check any settings in the kafka server level? >> >>>>>>> >> >>>>>>> On Sun, Nov 24, 2024 at 6:19 PM giri mungi < >> >> girimung...@gmail.com> >> >>>>>> wrote: >> >>>>>>> >> >>>>>>>> Hi I have set the below properties as below: >> >>>>>>>> >> >>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"1750000"); >> >>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); >> >>>>>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1500"); >> >>>>>>>> props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, >> >>>>> "1750000"); >> >>>>>>>> >> >>>>>>>> Poll Records Count :0 diff :2004 >> >>>>>>>> Poll Records Count :500 diff :943 >> >>>>>>>> Poll Records Count :0 diff :2000 >> >>>>>>>> Poll Records Count :44 diff :3 >> >>>>>>>> Poll Records Count :500 diff :205 >> >>>>>>>> Poll Records Count :488 diff :1978 >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> if i set max records as 500 or 1000 then sometimes i am getting >> >>>> count >> >>>>>> as >> >>>>>>>> "0" and taking max wait ms time which is impacting the overall >> >>>>>> response >> >>>>>>>> time.. >> >>>>>>>> >> >>>>>>>> Even though I have mentioned *fetch_min_bytes & >> >> max_poll_records >> >>>> iam >> >>>>>>>> getting "0" records which is impacting the overall response >> >>>> time.* >> >>>>>>>> >> >>>>>>>> How to avoid this please. >> >>>>>>>> >> >>>>>>>> please suggest this. >> >>>>>>>> >> >>>>>>>> Thanks, >> >>>>>>>> Giridar >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Sun, Nov 24, 2024 at 5:26 PM Chia-Ping Tsai < >> >>>> chia7...@apache.org> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> hi Giridar >> >>>>>>>>> >> >>>>>>>>> It seems your use case involves random reads, and you expect >> >> the >> >>>>>>> consumer >> >>>>>>>>> to return 1000 records from server at once. Therefore, you >> >> could >> >>>>>>> increase >> >>>>>>>>> the wait time (fetch.max.wait.ms) and fetch size >> >>>> (fetch.min.bytes) >> >>>>> to >> >>>>>>>>> receive a larger response with as many records as possible. >> >>>>>>>>> >> >>>>>>>>> A suitable value for fetch.min.bytes might be 'average size of >> >>>>>> records * >> >>>>>>>>> 1000'. You may also need to increase >> >> max.partition.fetch.bytes. >> >>>> For >> >>>>>>>>> fetch.max.wait.ms, a value of 1500 ms might match your >> >>>>> expectations. >> >>>>>>>>> >> >>>>>>>>> Best, >> >>>>>>>>> Chia-Ping >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On 2024/11/24 10:55:23 giri mungi wrote: >> >>>>>>>>>> Hi Yang, >> >>>>>>>>>> >> >>>>>>>>>> *This is the old code which is perfectly doing fine and >> >>>> returning >> >>>>>> less >> >>>>>>>>> than >> >>>>>>>>>> 3 seconds for all 1000 records.* >> >>>>>>>>>> >> >>>>>>>>>> do { >> >>>>>>>>>> FetchRequest req = new >> >>>> FetchRequestBuilder().clientId(clientName) >> >>>>>>>>>> .addFetch(a_topic, a_partition, readOffset, fetchSize) >> >>>>>>>>>> .build(); >> >>>>>>>>>> >> >>>>>>>>>> FetchResponse fetchResponse = consumer.fetch(req); >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> JSONObject obj = null; >> >>>>>>>>>> ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, >> >>>>>>>>>> a_partition); >> >>>>>>>>>> >> >>>>>>>>>> for (MessageAndOffset messageAndOffset : set) { >> >>>>>>>>>> String message = null; >> >>>>>>>>>> if (messageAndOffset.offset() < readOffset) { >> >>>>>>>>>> log.warn("Found an old offset: {} ,Expecting: {}", >> >>>>>>>>>> messageAndOffset.offset(), readOffset); >> >>>>>>>>>> continue; >> >>>>>>>>>> } >> >>>>>>>>>> >> >>>>>>>>>> ByteBuffer payload = messageAndOffset.message().payload(); >> >>>>>>>>>> >> >>>>>>>>>> byte[] bytes = new byte[payload.limit()]; >> >>>>>>>>>> payload.get(bytes); >> >>>>>>>>>> try { >> >>>>>>>>>> message = new String(bytes, "UTF-8"); >> >>>>>>>>>> } catch (UnsupportedEncodingException ue) { >> >>>>>>>>>> log.warn(ue.getMessage(), ue); >> >>>>>>>>>> message = new String(bytes); >> >>>>>>>>>> } >> >>>>>>>>>> >> >>>>>>>>>> obj = new JSONObject(message); >> >>>>>>>>>> msglist.add(new JSONObject(message)); >> >>>>>>>>>> } >> >>>>>>>>>> readOffset = messageAndOffset.nextOffset(); >> >>>>>>>>>> >> >>>>>>>>>> if (msglist.size() >= Math.round(limit >> >>>>>>>>>> / inputReq.getApplicationArea().getReqInfo().size()) >> >>>>>>>>>> || (endTime - startTime) >= waitTime) { >> >>>>>>>>>> log.info( >> >>>>>>>>>> "Wait condition has been met... exiting the fetch loop. >> >>>>> recordCount >> >>>>>> - >> >>>>>>>>> {}, >> >>>>>>>>>> time exhausted - {} ms.", >> >>>>>>>>>> msglist.size(), (endTime - startTime)); >> >>>>>>>>>> end = true; >> >>>>>>>>>> } >> >>>>>>>>>> >> >>>>>>>>>> } while (!end); >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> *The new code is taking 8 seconds.. which is more than >> >> double >> >>>> the >> >>>>>>> time* >> >>>>>>>>>> >> >>>>>>>>>> TopicPartition topicPartition = new TopicPartition("Test", >> >> 0); >> >>>>>>>>>> >> >>>>> consumer.seekToBeginning(Collections.singletonList(topicPartition)); >> >>>>>>>>>> long kafkaEarliestOffset = >> >> consumer.position(topicPartition); >> >>>>>>>>>> try (KafkaConsumer < String, String > consumer = >> >>>>>>>>>> KafkaConsumerFactory.createConsumer(clientName, fetchSize)) >> >> { >> >>>>>>>>>> consumer.partitionsFor(topicName); >> >>>>>>>>>> >> >>> consumer.assign(Collections.singletonList(topicPartition)); >> >>>>>>>>>> consumer.seek(topicPartition, readOffset); >> >>>>>>>>>> do { >> >>>>>>>>>> ConsumerRecords < String, String > records = >> >>>>>>>>>> consumer.poll(Duration.ofMillis(1500)); >> >>>>>>>>>> } while (!end) >> >>>>>>>>>> >> >>>>>>>>>> public static KafkaConsumer<String,String> >> >>> createConsumer(String >> >>>>>>>>>> clientName,int fetchSize) { >> >>>>>>>>>> Properties props = new Properties(); >> >>>>>>>>>> String kafkaBrokerStr = >> >>>>>>>>>> Config.getConsumerPropValue("kafkabrokerslist"); >> >>>>>>>>>> String groupId = Config.getConsumerPropValue(" >> >>>>> group.id >> >>>>>> "); >> >>>>>>>>>> >> >>>>>>>>> >> >> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> >>>>>>>>>> StringDeserializer.class.getName()); >> >>>>>>>>>> >> >>>>>>>>>> >> >>>> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> >>>>>>>>>> StringDeserializer.class.getName()); >> >>>>>>>>>> >> >> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, >> >>>> ""); >> >>>>>>>>>> >> >>>>>>> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >> >>>>>>>>>> "false"); >> >>>>>>>>>> >> >>>>>> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >> >>>>>>>>>> "earliest"); >> >>>>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, >> >>>>>> "1024"); >> >>>>>>>>>> >> >>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); >> >>>>>>>>>> >> >> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, >> >>>>>> "500"); >> >>>>>>>>>> return new KafkaConsumer<String,String>(props); >> >>>>>>>>>> } >> >>>>>>>>>> >> >>>>>>>>>> How to improve the performance on this. >> >>>>>>>>>> >> >>>>>>>>>> On Sun, Nov 24, 2024 at 4:11 PM giri mungi < >> >>>> girimung...@gmail.com >> >>>>>> >> >>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Hi Yang, >> >>>>>>>>>>> >> >>>>>>>>>>> Can i get the records from kafka as bytes or compression >> >>> form >> >>>> so >> >>>>>>> that >> >>>>>>>>> i >> >>>>>>>>>>> will take less time from kafka. >> >>>>>>>>>>> >> >>>>>>>>>>> I can build messages from those bytes.Is that possible? >> >>>>>>>>>>> >> >>>>>>>>>>> Can you please give suggestions on this. >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks, >> >>>>>>>>>>> Giridar >> >>>>>>>>>>> >> >>>>>>>>>>> On Sun, Nov 24, 2024 at 3:50 PM giri mungi < >> >>>>> girimung...@gmail.com >> >>>>>>> >> >>>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hi Yang, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks for your reply. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Now what should I do to improve my performance?Because >> >> the >> >>>> old >> >>>>>>> kafka >> >>>>>>>>> code >> >>>>>>>>>>>> was good in performance >> >>>>>>>>>>>> >> >>>>>>>>>>>> These are the properties: >> >>>>>>>>>>>> >> >>>>>>>>>>>> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ""); >> >>>>>>>>>>>> >> >> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >> >>>>>>>>>>>> "false"); >> >>>>>>>>>>>> >> >> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >> >>>>>>>>>>>> "earliest"); >> >>>>>>>>>>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, >> >> "1024"); >> >>>>>>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); >> >>>>>>>>>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, >> >> "500"); >> >>>>>>>>>>>> >> >>>>>>>>>>>> How to decrease this time ,plz suggest . >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks, >> >>>>>>>>>>>> Giridar >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Sun, Nov 24, 2024 at 3:15 PM PoAn Yang < >> >>>> yangp...@gmail.com> >> >>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>>> Hi Giridar, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> *Code explanation:Fetching records is taking time for >> >>> the >> >>>>>> first >> >>>>>>>>> poll.* >> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 1284 >> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 3 >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> For the first 500 records it took 1284 ms and next 500 >> >>>>> records >> >>>>>>> it >> >>>>>>>>> took >> >>>>>>>>>>>>> 4 ms >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> *Why this much difference? I would like to improve the >> >>>>>>>>> performance of >> >>>>>>>>>>>>> the >> >>>>>>>>>>>>>> first poll time?* >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> IIRC, a consumer has FetchBuffer (cache). >> >>>>>>>>>>>>> If a FetchResponse can return records morn than >> >>>>>>> `max.poll.records`, >> >>>>>>>>> the >> >>>>>>>>>>>>> extra records will be stored in FetchBuffer. >> >>>>>>>>>>>>> That is why the second poll can get data in short time. >> >>>>>>>>>>>>> IMO, we don’t need to balance each poll time, because >> >> that >> >>>>> means >> >>>>>>> we >> >>>>>>>>>>>>> don’t want local cache. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> For example if i give "0" as input offset and it is >> >>> taking >> >>>>>> time >> >>>>>>> as >> >>>>>>>>>>>>> below (6 >> >>>>>>>>>>>>>> seconds) and not getting 500 records also,it is >> >> getting >> >>>> only >> >>>>>> 200 >> >>>>>>>>>>>>> records >> >>>>>>>>>>>>>> per poll and taking lot of time...why this is >> >> happening >> >>>> and >> >>>>>> how >> >>>>>>> to >> >>>>>>>>>>>>> avoid >> >>>>>>>>>>>>>> this. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1227 ms >> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1181 ms >> >>>>>>>>>>>>>> Poll Records Count :296 Time taken:1234 ms >> >>>>>>>>>>>>>> Poll Records Count :292 Time taken:1133 ms >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> *If I give an offset as 110999 and it is getting some >> >>> fast >> >>>>> and >> >>>>>>>>> records >> >>>>>>>>>>>>>> getting as 500 each..Why this difference please.* >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:1284 ms >> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:3 ms >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> IIUC, a FetchRequest has a limitation from >> >>>> `fetch.max.bytes`. >> >>>>>>>>>>>>> If the record size from offset “0” is bigger than from >> >>>> offset >> >>>>>>>>> “110999”, >> >>>>>>>>>>>>> then a FetchResponse returns less records. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Please correct me if I misunderstood anything. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Thanks, >> >>>>>>>>>>>>> PoAn >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> On Nov 24, 2024, at 2:09 PM, giri mungi < >> >>>>>> girimung...@gmail.com> >> >>>>>>>>> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Hi Team, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Good day to you. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Iam Giridhar.I need your suggestions in kafka >> >>>>>>>>>>>>>> performance improvement please. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> *Scenario is: The user will give the offset as input >> >> and >> >>>>> based >> >>>>>>> on >> >>>>>>>>> the >> >>>>>>>>>>>>>> offset we need to give the next 1000 messages from >> >> kafka >> >>>>> topic >> >>>>>>>>> and next >> >>>>>>>>>>>>>> offset.The kafka topic contains only one partition.* >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> We are trying to migrate from old kafka to new >> >> kafka.In >> >>>> the >> >>>>>> old >> >>>>>>>>> kafka >> >>>>>>>>>>>>> we >> >>>>>>>>>>>>>> were using code like: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> *old code(kafka clients 0.8 .1):* >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> FetchRequest req = new >> >>>>>>>>>>>>>> >> >>>> FetchRequestBuilder().clientId(clientName).addFetch(a_topic, >> >>>>>>>>>>>>>> a_partition, readOffset, fetchSize).build(); >> >>>>>>>>>>>>>> FetchResponse fetchResponse = consumer.fetch(req); >> >>>>>>>>>>>>>> ByteBufferMessageSet set = >> >>>> fetchResponse.messageSet(a_topic, >> >>>>>>>>>>>>> a_partition); >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> This code is super fast and same we are trying to >> >>> achieve >> >>>>>> using >> >>>>>>>>>>>>>> KafkaConsumer API and getting slowness >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> *New kafkaconsumer code is using(kafka clients 3.6 >> >> .1)* >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> TopicPartition topicPartition = new >> >>> TopicPartition("Test", >> >>>>> 0); >> >>>>>>>>>>>>>> >> >>>>>>>>> >> >>>> consumer.seekToBeginning(Collections.singletonList(topicPartition)); >> >>>>>>>>>>>>>> long kafkaEarliestOffset = >> >>>>> consumer.position(topicPartition); >> >>>>>>>>>>>>>> try (KafkaConsumer < String, String > consumer = >> >>>>>>>>>>>>>> KafkaConsumerFactory.createConsumer(clientName, >> >>>> fetchSize)) >> >>>>> { >> >>>>>>>>>>>>>> >> >>>>> consumer.assign(Collections.singletonList(topicPartition)); >> >>>>>>>>>>>>>> consumer.seek(topicPartition, readOffset); >> >>>>>>>>>>>>>> do { >> >>>>>>>>>>>>>> ConsumerRecords < String, String > records = >> >>>>>>>>>>>>>> consumer.poll(Duration.ofMillis(1500)); >> >>>>>>>>>>>>>> } while (!end) >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> public static KafkaConsumer<String,String> >> >>>>>> createConsumer(String >> >>>>>>>>>>>>>> clientName,int fetchSize) { >> >>>>>>>>>>>>>> Properties props = new Properties(); >> >>>>>>>>>>>>>> String kafkaBrokerStr = >> >>>>>>>>>>>>>> Config.getConsumerPropValue("kafkabrokerlist"); >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> >>>>>>>>>>>>>> StringDeserializer.class.getName()); >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> >>>>>>>>>>>>>> StringDeserializer.class.getName()); >> >>>>>>>>>>>>>> >> >>>> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, >> >>>>>>> ""); >> >>>>>>>>>>>>>> >> >>>>>>>>> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >> >>>>>>>>>>>>>> "false"); >> >>>>>>>>>>>>>> >> >>>>>>>>> props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >> >>>>>>>>>>>>>> "earliest"); >> >>>>>>>>>>>>>> >> >>>> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, >> >>>>>>>>> "1024"); >> >>>>>>>>>>>>>> >> >>>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); >> >>>>>>>>>>>>>> >> >>>>> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, >> >>>>>>>>> "500"); >> >>>>>>>>>>>>>> return new >> >>> KafkaConsumer<String,String>(props); >> >>>>>>>>>>>>>> } >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> *Code explanation:Fetching records is taking time for >> >>> the >> >>>>>> first >> >>>>>>>>> poll.* >> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 1284 >> >>>>>>>>>>>>>> Poll Records Count: 500 diff: 3 >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> For the first 500 records it took 1284 ms and next 500 >> >>>>> records >> >>>>>>> it >> >>>>>>>>> took >> >>>>>>>>>>>>> 4 ms >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> *Why this much difference? I would like to improve the >> >>>>>>>>> performance of >> >>>>>>>>>>>>> the >> >>>>>>>>>>>>>> first poll time?* >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 1) How to fetch first 500 records in less time >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> *I am also seeing one strange issue.My kafka topic >> >> which >> >>>> has >> >>>>>> one >> >>>>>>>>>>>>> partition >> >>>>>>>>>>>>>> contains some 5 lakh records*.*The starting records >> >> take >> >>>>> more >> >>>>>>>>> time to >> >>>>>>>>>>>>> fetch >> >>>>>>>>>>>>>> from kafka.* >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> For example if i give "0" as input offset and it is >> >>> taking >> >>>>>> time >> >>>>>>> as >> >>>>>>>>>>>>> below (6 >> >>>>>>>>>>>>>> seconds) and not getting 500 records also,it is >> >> getting >> >>>> only >> >>>>>> 200 >> >>>>>>>>>>>>> records >> >>>>>>>>>>>>>> per poll and taking lot of time...why this is >> >> happening >> >>>> and >> >>>>>> how >> >>>>>>> to >> >>>>>>>>>>>>> avoid >> >>>>>>>>>>>>>> this. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1227 ms >> >>>>>>>>>>>>>> Poll Records Count :292 Time taken :1181 ms >> >>>>>>>>>>>>>> Poll Records Count :296 Time taken:1234 ms >> >>>>>>>>>>>>>> Poll Records Count :292 Time taken:1133 ms >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> *If I give an offset as 110999 and it is getting some >> >>> fast >> >>>>> and >> >>>>>>>>> records >> >>>>>>>>>>>>>> getting as 500 each..Why this difference please.* >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:1284 ms >> >>>>>>>>>>>>>> Poll Records Count :500 Time taken:3 ms >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Please give your suggestion on this. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>>> Giridar >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> >