Hi all,
The below are the two implementation logs : 1) old kafka broker with old kafka client version (0.8 etc) 2) new kafka brokers (3.6.1) with new kafka client version (3.6.1) . The old does not have authentication but new kafka we are using authentication..we went to new one for security and robustness. *Old kafka brokers with old kafka api implementation: Here we are using * 2025-01-16 08:46:12,380 INFO (com.cisco.cr.kafka.controller.EventsController:98) [http-nio-8080-exec-8] - Consume process begin... 2025-01-16 08:46:12,380 INFO (com.test.cr.kafka.services.NextgenConsumerDriver:82) [http-nio-8080-exec-8] - Validating request... 2025-01-16 08:46:12,387 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-8] - Latest offset in Kafka for topic: crt_org_ce_ts1crt & partition: 0 is 16136630 and in user input is 16130077 2025-01-16 08:46:12,387 INFO (com.test.cr.kafka.services.TestKafkaConsumer:117) [http-nio-8080-exec-8] - Earliest offset in Kafka for topic: crt_org_ce_ts1crt & partition: 0 is 16129521 and in user input is 16130077 2025-01-16 08:46:12,415 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16130077 - 1043773 KB. 2025-01-16 08:46:12,502 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16130372 - 1043496 KB. 2025-01-16 08:46:12,543 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16130670 - 1044428 KB. 2025-01-16 08:46:12,587 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16130968 - 1044939 KB. 2025-01-16 08:46:12,622 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16131265 - 1045057 KB. 2025-01-16 08:46:12,650 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16131567 - 1042968 KB. 2025-01-16 08:46:12,676 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16131863 - 1043547 KB. 2025-01-16 08:46:12,698 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16132161 - 1045497 KB. 2025-01-16 08:46:12,723 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16132459 - 1043030 KB. 2025-01-16 08:46:12,746 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16132755 - 1045957 KB. 2025-01-16 08:46:12,772 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16133054 - 1043704 KB. 2025-01-16 08:46:12,789 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16133350 - 533178 KB. 2025-01-16 08:46:12,805 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16133499 - 1042816 KB. 2025-01-16 08:46:12,832 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16133797 - 1045895 KB. 2025-01-16 08:46:12,859 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16134093 - 1044584 KB. 2025-01-16 08:46:12,884 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16134398 - 1045775 KB. 2025-01-16 08:46:12,906 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16134702 - 1043018 KB. 2025-01-16 08:46:12,930 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16134999 - 1045228 KB. 2025-01-16 08:46:12,952 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16135290 - 1044921 KB. 2025-01-16 08:46:12,979 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16135579 - 1046197 KB. 2025-01-16 08:46:13,002 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16135876 - 1042978 KB. 2025-01-16 08:46:13,029 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16136172 - 1045985 KB. 2025-01-16 08:46:13,050 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16136463 - 600146 KB. 2025-01-16 08:46:13,057 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16136630 - 0 KB. 2025-01-16 08:46:14,059 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16136630 - 0 KB. 2025-01-16 08:46:15,061 INFO (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8] - Total size of messages read from offset 16136630 - 0 KB. 2025-01-16 08:46:15,061 INFO (com.test.cr.kafka.services.TestKafkaConsumer:221) [http-nio-8080-exec-8] - No messages were found in 3 successive fetches. Stopping the consume process here. 2025-01-16 08:46:15,061 INFO (com.test.cr.kafka.services.TestKafkaConsumer:242) [http-nio-8080-exec-8] - Closing consumer... crt_consumer_crt_org_ce_ts1crt_0_Webex-Webex 2025-01-16 08:46:15,061 INFO (com.test.cr.kafka.services.TestKafkaConsumer:249) [http-nio-8080-exec-8] - Summary ::: Total number of messages read from offset 16130077 to 16136630 are - 6553. Count of filtered results - 54 2025-01-16 08:46:15,081 INFO (com.cisco.cr.kafka.controller.EventsController:154) [http-nio-8080-exec-8] - Consume process end. old code: In old code we were using FetchRequest,FetchResponse,SimpleConsumer without authentication and this is low level API public class ChangeEventsKafkaConsumer{ private CEConsumeResponse ceResponse; public ChangeEventsKafkaConsumer() { super(); } public ChangeEventsKafkaConsumer(ConfigBean confBean, CEConsumeResponse res, long offset) { super(confBean); this.ceResponse = res; this.offset = offset; } @Override public long readMessages(long a_maxRead, String a_topic, int a_partition, List<String> ng_replicaBrokers, int a_port) throws CustomException { return 0l; } public List<JSONObject> consume(long offset, String a_topic, int a_partition, List<String> ng_replicaBrokers, int a_port, CEConsumeRequest inputReq) throws CustomException { List<JSONObject> msglist = new ArrayList<JSONObject>(); int waitTime = Integer.valueOf(Config .getConsumerPropValue("pull.wait.time")); int limit = Integer.valueOf(Config.getConsumerPropValue("pull.size.limit")); int fetchSize = Integer.valueOf(Config .getConsumerPropValue("pull.each.fetch.size")); int emptyLoopLimit = Integer.valueOf(Config .getConsumerPropValue("pull.empty.loop.limit")); /* * Fetching Metadata. */ PartitionMetadata metadata = findLeader(ng_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { log.error("Can't find metadata for Topic and Partition."); throw new CustomException( "Can't find metadata for Topic and Partition."); } if (metadata.leader() == null) { log.error("Can't find Leader for Topic and Partition."); throw new CustomException( "Can't find Leader for Topic and Partition."); } /* * Getting lead broker. */ String leadBroker = metadata.leader().host(); log.debug("Lead broker for Partion : {}, Topic :{} is {}", a_partition, a_topic, leadBroker); SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, configBean.getConnectionTimeOut(), configBean.getKafkaBufferSize(), clientName); /* * Fetching kafka offset from Kafka */ long kafkaOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); /* * Getting earliest available offset in kafka. */ long kafkaEarliestOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); /* * User input offset. */ long readOffset = offset; log.info( "Latest offset in Kafka for topic: {} & partition: {} is {} and in user input is {}", a_topic, a_partition, kafkaOffset, readOffset); log.info( "Earliest offset in Kafka for topic: {} & partition: {} is {} and in user input is {}", a_topic, a_partition, kafkaEarliestOffset, readOffset); if (readOffset == 0 && readOffset < kafkaEarliestOffset) { log.warn("Resetting the offset to earliest available offset in kafka."); readOffset = kafkaEarliestOffset; } boolean end = false; long startTime = Calendar.getInstance().getTimeInMillis(); long endTime = Calendar.getInstance().getTimeInMillis(); int emptyFetchCount = 0; do { FetchRequest req = new FetchRequestBuilder().clientId(clientName) .addFetch(a_topic, a_partition, readOffset, fetchSize) .build(); FetchResponse fetchResponse = consumer.fetch(req); /* * Reading data from kafka response. */ JSONObject obj = null; ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, a_partition); log.info("Total size of messages read from offset {} - {} KB.", readOffset, set.sizeInBytes()); for (MessageAndOffset messageAndOffset : set) { String message = null; if (messageAndOffset.offset() < readOffset) { log.warn("Found an old offset: {} ,Expecting: {}", messageAndOffset.offset(), readOffset); continue; } ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); try { message = new String(bytes, "UTF-8"); } catch (UnsupportedEncodingException ue) { log.warn(ue.getMessage(), ue); message = new String(bytes); } log.debug( "client name : {} , Offset is : {} , Message is : {} ", clientName, String.valueOf(messageAndOffset.offset()), message); CONSUME_LOG.debug(a_topic + "\t" + a_partition + "\t" + String.valueOf(messageAndOffset.offset())); obj = new JSONObject(message); if (!inputReq.getApplicationArea().isSubscription() || (obj.has("SUBSCRIBER_NAMES") && obj.getString( "SUBSCRIBER_NAMES").contains( inputReq.getSender().getReferenceID()))) { if (inputReq.getApplicationArea().getChangeEventsType() == null || (obj.has("NOTIFICATION_EVENT_NAME") && inputReq .getApplicationArea() .getChangeEventsType() .contains( obj.getString("NOTIFICATION_EVENT_NAME")))) { msglist.add(new JSONObject(message)); } } readOffset = messageAndOffset.nextOffset(); } endTime = Calendar.getInstance().getTimeInMillis(); if (msglist.size() >= Math.round(limit / inputReq.getApplicationArea().getReqInfo().size()) || (endTime - startTime) >= waitTime) { log.info( "Wait condition has been met... exiting the fetch loop. recordCount - {}, time exhausted - {} ms.", msglist.size(), (endTime - startTime)); end = true; } else if (set.sizeInBytes() == 0) { emptyFetchCount++; try { if(emptyFetchCount == emptyLoopLimit) { log.info("No messages were found in 3 successive fetches. Stopping the consume process here."); end = true; } else { Thread.sleep(1000); } } catch (InterruptedException ie) { CONSUME_LOG.warn(ie.getMessage(), ie); } } else { emptyFetchCount = 0; } req = null; } while (!end); *New kafkaconsumer implementation with new kafka brokers:* 2025-01-16 08:46:56,307 INFO (com.test.cr.kafka.controller.EventsController:99) [http-nio-8080-exec-3] - Consume process begin... 2025-01-16 08:46:56,307 INFO (com.test.cr.kafka.services.NextgenConsumerDriver:76) [http-nio-8080-exec-3] - Validating request... 2025-01-16 08:46:56,307 INFO (com.test.cr.kafka.services.TestKafkaConsumer:68) [http-nio-8080-exec-3] - Consume messages Topic: ORG_CHANGE_EVENT_YS1_18_OCT_2024 & partition: 0 2025-01-16 08:46:56,307 INFO (com.test.cr.kafka.services.KafkaConsumerFactory:45) [http-nio-8080-exec-3] - KafkaUser:app_client kafkaBrokerStr: ys1kafka-aln-01.test.com:9092,ys1kafka-aln-02.cisco.com:9092, ys1kafka-rdn-01.test.com:9092,ys1kafka-rdn-02.test.com:9092 GroupID:crt_org_change_events_lt 2025-01-16 08:46:56,308 INFO (org.apache.kafka.clients.consumer.ConsumerConfig:370) [http-nio-8080-exec-3] - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.include.jmx.reporter = true auto.offset.reset = none bootstrap.servers = [kafka-aln-01.test.com:9092, kafka-aln-02.test.com:9092, kafka-rdn-01.test.com:9092, kafka-rdn-02.test.com:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = crt_consumer_http-nio-8080-exec-3 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 10 fetch.min.bytes = 1 group.id = group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.connect.timeout.ms = null sasl.login.read.timeout.ms = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.login.retry.backoff.max.ms = 10000 sasl.login.retry.backoff.ms = 100 sasl.mechanism = PLAIN sasl.oauthbearer.clock.skew.seconds = 30 sasl.oauthbearer.expected.audience = null sasl.oauthbearer.expected.issuer = null sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 sasl.oauthbearer.jwks.endpoint.url = null sasl.oauthbearer.scope.claim.name = scope sasl.oauthbearer.sub.claim.name = sub sasl.oauthbearer.token.endpoint.url = null security.protocol = SASL_SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 45000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2025-01-16 08:46:56,308 WARN (org.apache.kafka.clients.consumer.KafkaConsumer:688) [http-nio-8080-exec-3] - [Consumer clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Support for using the empty group id by consumers is deprecated and will be removed in the next major release. 2025-01-16 08:46:56,330 INFO (org.apache.kafka.common.utils.AppInfoParser:119) [http-nio-8080-exec-3] - Kafka version: 3.6.1 2025-01-16 08:46:56,330 INFO (org.apache.kafka.common.utils.AppInfoParser:120) [http-nio-8080-exec-3] - Kafka commitId: 5e3c2b738d253ff5 2025-01-16 08:46:56,330 INFO (org.apache.kafka.common.utils.AppInfoParser:121) [http-nio-8080-exec-3] - Kafka startTimeMs: 1737046016330 2025-01-16 08:46:56,330 INFO (org.apache.kafka.clients.consumer.KafkaConsumer:1067) [http-nio-8080-exec-3] - [Consumer clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Assigned to partition(s): ORG_CHANGE_EVENT_YS1_18_OCT_2024-0 2025-01-16 08:46:56,330 INFO (org.apache.kafka.clients.consumer.KafkaConsumer:1067) [http-nio-8080-exec-3] - [Consumer clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Assigned to partition(s): ORG_CHANGE_EVENT_YS1_18_OCT_2024-0 2025-01-16 08:46:56,832 INFO (com.test.cr.kafka.services.TestKafkaConsumer:95) [http-nio-8080-exec-3] - EarliestOffset :0 LatestOffset :16136566 Time :502 ms 2025-01-16 08:46:56,832 INFO (com.test.cr.kafka.services.TestKafkaConsumer:97) [http-nio-8080-exec-3] - Latest & Earliest offset in Kafka for topic: ORG_CHANGE_EVENT_YS1_18_OCT_2024 & partition: 0 Latest is 16136566 Earliest is 0 and in user input is 16130077 2025-01-16 08:46:56,832 INFO (org.apache.kafka.clients.consumer.KafkaConsumer:1564) [http-nio-8080-exec-3] - [Consumer clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Seeking to offset 16130077 for partition ORG_CHANGE_EVENT_YS1_18_OCT_2024-0 2025-01-16 08:46:57,157 INFO (org.apache.kafka.clients.Metadata:287) [http-nio-8080-exec-3] - [Consumer clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Cluster ID: WyVVmQEwTbeRp73DYJgk7Q 2025-01-16 08:46:58,741 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:46:58,823 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:46:58,898 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:46:58,964 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :188 2025-01-16 08:47:00,243 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:47:00,316 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:47:00,383 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:47:00,455 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :235 2025-01-16 08:47:01,735 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:47:01,799 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:47:01,966 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:47:02,035 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :294 2025-01-16 08:47:02,912 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:47:02,981 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :500 2025-01-16 08:47:03,050 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :272 2025-01-16 08:47:05,087 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :0 2025-01-16 08:47:08,088 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :0 2025-01-16 08:47:11,088 INFO (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3] - Poll Records Count :0 2025-01-16 08:47:11,088 INFO (com.test.cr.kafka.services.TestKafkaConsumer:157) [http-nio-8080-exec-3] - No messages were found in 3 successive fetches. Stopping the consume process here. 2025-01-16 08:47:11,088 INFO (com.test.cr.kafka.services.TestKafkaConsumer:179) [http-nio-8080-exec-3] - Summary ::: Total number of messages read from offset 16130077 to 16136566 are - 6489. Count of filtered results - 54 2025-01-16 08:47:11,088 INFO (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1025) [http-nio-8080-exec-3] - [Consumer clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Resetting generation and member id due to: consumer pro-actively leaving the group 2025-01-16 08:47:11,089 INFO (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072) [http-nio-8080-exec-3] - [Consumer clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Request joining group due to: consumer pro-actively leaving the group 2025-01-16 08:47:11,138 INFO (org.apache.kafka.common.metrics.Metrics:693) [http-nio-8080-exec-3] - Metrics scheduler closed 2025-01-16 08:47:11,139 INFO (org.apache.kafka.common.metrics.Metrics:697) [http-nio-8080-exec-3] - Closing reporter org.apache.kafka.common.metrics.JmxReporter 2025-01-16 08:47:11,139 INFO (org.apache.kafka.common.metrics.Metrics:703) [http-nio-8080-exec-3] - Metrics reporters closed 2025-01-16 08:47:11,140 INFO (org.apache.kafka.common.utils.AppInfoParser:83) [http-nio-8080-exec-3] - App info kafka.consumer for crt_consumer_http-nio-8080-exec-3 unregistered 2025-01-16 08:47:11,159 INFO (com.cisco.cr.kafka.controller.EventsController:156) [http-nio-8080-exec-3] - Consume process end. New code:Here we are using KafkaConsumer API with SASL authentication public List<JSONObject> consume(long offset, String topicName,int partition,CEConsumeRequest inputReq) throws CustomException { List<JSONObject> msglist = new ArrayList<JSONObject>(); int waitTime = Integer.valueOf(Config .getConsumerPropValue("pull.wait.time")); int limit = Integer.valueOf(Config.getConsumerPropValue("pull.size.limit")); int emptyLoopLimit = Integer.valueOf(Config .getConsumerPropValue("pull.empty.loop.limit")); int fetchSize = Integer.valueOf(Config .getConsumerPropValue("pull.each.fetch.size")); log.info( "Consume messages Topic: {} & partition: {}", topicName, partition); TopicPartition topicPartition = new TopicPartition(topicName, partition); long readOffset = offset; clientName = "crt_consumer_" + Thread.currentThread().getName(); try (KafkaConsumer<String,String> consumer = KafkaConsumerFactory.createConsumer(clientName,fetchSize,topicPartition)){ consumer.assign(Collections.singletonList(topicPartition)); long OffsetStartTime = System.currentTimeMillis(); long OffsetEndTime= System.currentTimeMillis(); long kafkaEarliestOffset=0; long latestOffset=0; Admin adminClient = KafkaConsumerFactory.getAdminClient(); try { kafkaEarliestOffset = adminClient.listOffsets( Map.of(topicPartition, OffsetSpec.earliest()) ).all().get().get(topicPartition).offset(); latestOffset = adminClient.listOffsets( Map.of(topicPartition, OffsetSpec.latest()) ).all().get().get(topicPartition).offset(); OffsetEndTime = System.currentTimeMillis(); } catch (Exception e) { e.printStackTrace(); } log.info("EarliestOffset :{} LatestOffset :{} Time :{} ms",kafkaEarliestOffset,latestOffset,(OffsetEndTime - OffsetStartTime)); log.info( "Latest & Earliest offset in Kafka for topic: {} & partition: {} Latest is {} Earliest is {} and in user input is {}", topicName, partition, latestOffset, kafkaEarliestOffset,readOffset); if (readOffset == 0 && readOffset < kafkaEarliestOffset) { log.warn("Resetting the offset to earliest available offset in kafka."); readOffset = kafkaEarliestOffset; } boolean end = false; long startTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis(); int emptyFetchCount = 0; consumer.seek(topicPartition, offset); do { JSONObject obj = null; ConsumerRecords < String, String > records = consumer.poll(Duration.ofMillis(2000)); log.info("Poll Records Count :{}",records.count()); for (ConsumerRecord< String, String > consumerRecord: records) { long currentOffset = consumerRecord.offset(); if (currentOffset < readOffset) { log.warn("Found an old offset: {}, Expecting: {}", currentOffset, readOffset); continue; } String message = consumerRecord.value(); log.debug( "client name : {} , Offset is : {} , Message is : {} ", clientName, readOffset, message); CONSUME_LOG.debug(topicName + "\t" + partition + "\t" + String.valueOf(currentOffset)); obj = new JSONObject(message); if (!inputReq.getApplicationArea().isSubscription() || (obj.has("SUBSCRIBER_NAMES") && obj.getString( "SUBSCRIBER_NAMES").contains( inputReq.getSender().getReferenceID()))) { if (inputReq.getApplicationArea().getChangeEventsType() == null || (obj.has("NOTIFICATION_EVENT_NAME") && inputReq .getApplicationArea() .getChangeEventsType() .contains( obj.getString("NOTIFICATION_EVENT_NAME")))) { msglist.add(obj); } } readOffset = currentOffset + 1; } endTime = System.currentTimeMillis(); if (msglist.size() >= Math.round(limit / inputReq.getApplicationArea().getReqInfo().size()) || (endTime - startTime) >= waitTime) { log.info( "Wait condition has been met... exiting the fetch loop. recordCount - {}, time exhausted - {} ms.", msglist.size(), (endTime - startTime)); end = true; //consumer.commitSync(); } else if (records.isEmpty()) { emptyFetchCount++; try { if(emptyFetchCount == emptyLoopLimit) { log.info("No messages were found in 3 successive fetches. Stopping the consume process here."); end = true; } else { Thread.sleep(1000); } } catch (InterruptedException ie) { CONSUME_LOG.warn(ie.getMessage(), ie); } } else { emptyFetchCount = 0; } } while (!end); The new kafka consumer API is very slow..i have tried lot of settings like increasing MAX_POLL_RECORDS_CONFIG,FETCH_MIN_BYTES_CONFIG etc..but no use. Is this time expected with the new KafkaConsumer API.How to improve my timings. please help on this. Regards, Giri