Hi all,

The below are the two implementation logs : 1) old kafka broker with old
kafka client version (0.8 etc)
2) new kafka brokers (3.6.1) with new kafka client version (3.6.1)  .

The old does not have authentication but new kafka we are using
authentication..we went to new one for security and robustness.



*Old kafka brokers with old kafka api implementation: Here we are using *

2025-01-16 08:46:12,380 INFO
 (com.cisco.cr.kafka.controller.EventsController:98) [http-nio-8080-exec-8]
- Consume process begin...
2025-01-16 08:46:12,380 INFO
 (com.test.cr.kafka.services.NextgenConsumerDriver:82)
[http-nio-8080-exec-8] - Validating request...
2025-01-16 08:46:12,387 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-8]
- Latest offset in Kafka for topic: crt_org_ce_ts1crt & partition: 0 is
16136630 and in user input is 16130077
2025-01-16 08:46:12,387 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:117) [http-nio-8080-exec-8]
- Earliest offset in Kafka for topic: crt_org_ce_ts1crt & partition: 0 is
16129521 and in user input is 16130077
2025-01-16 08:46:12,415 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16130077 - 1043773 KB.
2025-01-16 08:46:12,502 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16130372 - 1043496 KB.
2025-01-16 08:46:12,543 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16130670 - 1044428 KB.
2025-01-16 08:46:12,587 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16130968 - 1044939 KB.
2025-01-16 08:46:12,622 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16131265 - 1045057 KB.
2025-01-16 08:46:12,650 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16131567 - 1042968 KB.
2025-01-16 08:46:12,676 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16131863 - 1043547 KB.
2025-01-16 08:46:12,698 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16132161 - 1045497 KB.
2025-01-16 08:46:12,723 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16132459 - 1043030 KB.
2025-01-16 08:46:12,746 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16132755 - 1045957 KB.
2025-01-16 08:46:12,772 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16133054 - 1043704 KB.
2025-01-16 08:46:12,789 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16133350 - 533178 KB.
2025-01-16 08:46:12,805 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16133499 - 1042816 KB.
2025-01-16 08:46:12,832 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16133797 - 1045895 KB.
2025-01-16 08:46:12,859 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16134093 - 1044584 KB.
2025-01-16 08:46:12,884 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16134398 - 1045775 KB.
2025-01-16 08:46:12,906 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16134702 - 1043018 KB.
2025-01-16 08:46:12,930 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16134999 - 1045228 KB.
2025-01-16 08:46:12,952 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16135290 - 1044921 KB.
2025-01-16 08:46:12,979 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16135579 - 1046197 KB.
2025-01-16 08:46:13,002 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16135876 - 1042978 KB.
2025-01-16 08:46:13,029 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136172 - 1045985 KB.
2025-01-16 08:46:13,050 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136463 - 600146 KB.
2025-01-16 08:46:13,057 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136630 - 0 KB.
2025-01-16 08:46:14,059 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136630 - 0 KB.
2025-01-16 08:46:15,061 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136630 - 0 KB.
2025-01-16 08:46:15,061 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:221) [http-nio-8080-exec-8]
- No messages were found in 3 successive fetches. Stopping the consume
process here.
2025-01-16 08:46:15,061 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:242) [http-nio-8080-exec-8]
- Closing consumer... crt_consumer_crt_org_ce_ts1crt_0_Webex-Webex
2025-01-16 08:46:15,061 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:249) [http-nio-8080-exec-8]
- Summary ::: Total number of messages read from offset 16130077 to
16136630 are - 6553. Count of filtered results - 54
2025-01-16 08:46:15,081 INFO
 (com.cisco.cr.kafka.controller.EventsController:154)
[http-nio-8080-exec-8] - Consume process end.

old code: In old code we were using
FetchRequest,FetchResponse,SimpleConsumer without authentication and this
is low level API


public class ChangeEventsKafkaConsumer{


private CEConsumeResponse ceResponse;

public ChangeEventsKafkaConsumer() {
super();
}

public ChangeEventsKafkaConsumer(ConfigBean confBean,
CEConsumeResponse res, long offset) {
super(confBean);
this.ceResponse = res;
this.offset = offset;
}

@Override
public long readMessages(long a_maxRead, String a_topic, int a_partition,
List<String> ng_replicaBrokers, int a_port) throws CustomException {
return 0l;
}

public List<JSONObject> consume(long offset, String a_topic,
int a_partition, List<String> ng_replicaBrokers, int a_port,
CEConsumeRequest inputReq) throws CustomException {
List<JSONObject> msglist = new ArrayList<JSONObject>();

int waitTime = Integer.valueOf(Config
.getConsumerPropValue("pull.wait.time"));
int limit = Integer.valueOf(Config.getConsumerPropValue("pull.size.limit"));
int fetchSize = Integer.valueOf(Config
.getConsumerPropValue("pull.each.fetch.size"));
int emptyLoopLimit = Integer.valueOf(Config
.getConsumerPropValue("pull.empty.loop.limit"));

/*
* Fetching Metadata.
*/
PartitionMetadata metadata = findLeader(ng_replicaBrokers, a_port,
a_topic, a_partition);
if (metadata == null) {
log.error("Can't find metadata for Topic and Partition.");
throw new CustomException(
"Can't find metadata for Topic and Partition.");
}
if (metadata.leader() == null) {
log.error("Can't find Leader for Topic and Partition.");
throw new CustomException(
"Can't find Leader for Topic and Partition.");
}

/*
* Getting lead broker.
*/
String leadBroker = metadata.leader().host();
log.debug("Lead broker for Partion : {}, Topic :{} is {}", a_partition,
a_topic, leadBroker);
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
configBean.getConnectionTimeOut(),
configBean.getKafkaBufferSize(), clientName);

/*
* Fetching kafka offset from Kafka
*/
long kafkaOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);

/*
* Getting earliest available offset in kafka.
*/
long kafkaEarliestOffset = getLastOffset(consumer, a_topic,
a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

/*
* User input offset.
*/
long readOffset = offset;

log.info(
"Latest offset in Kafka for topic: {} & partition: {} is {} and in user
input is {}",
a_topic, a_partition, kafkaOffset, readOffset);
log.info(
"Earliest offset in Kafka for topic: {} & partition: {} is {} and in user
input is {}",
a_topic, a_partition, kafkaEarliestOffset, readOffset);

if (readOffset == 0 && readOffset < kafkaEarliestOffset) {
log.warn("Resetting the offset to earliest available offset in kafka.");
readOffset = kafkaEarliestOffset;
}

boolean end = false;
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = Calendar.getInstance().getTimeInMillis();
int emptyFetchCount = 0;
do {
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, fetchSize)
.build();

FetchResponse fetchResponse = consumer.fetch(req);



/*
* Reading data from kafka response.
*/
JSONObject obj = null;
ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
a_partition);
log.info("Total size of messages read from offset {} - {} KB.",
readOffset, set.sizeInBytes());
for (MessageAndOffset messageAndOffset : set) {
String message = null;
if (messageAndOffset.offset() < readOffset) {
log.warn("Found an old offset: {} ,Expecting: {}",
messageAndOffset.offset(), readOffset);
continue;
}

ByteBuffer payload = messageAndOffset.message().payload();

byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
try {
message = new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException ue) {
log.warn(ue.getMessage(), ue);
message = new String(bytes);
}
log.debug(
"client name : {} , Offset is : {} , Message is : {} ",
clientName, String.valueOf(messageAndOffset.offset()),
message);
CONSUME_LOG.debug(a_topic + "\t" + a_partition + "\t"
+ String.valueOf(messageAndOffset.offset()));
obj = new JSONObject(message);
if (!inputReq.getApplicationArea().isSubscription()
|| (obj.has("SUBSCRIBER_NAMES") && obj.getString(
"SUBSCRIBER_NAMES").contains(
inputReq.getSender().getReferenceID()))) {
if (inputReq.getApplicationArea().getChangeEventsType() == null
|| (obj.has("NOTIFICATION_EVENT_NAME") && inputReq
.getApplicationArea()
.getChangeEventsType()
.contains(
obj.getString("NOTIFICATION_EVENT_NAME")))) {
msglist.add(new JSONObject(message));
}
}
readOffset = messageAndOffset.nextOffset();
}
endTime = Calendar.getInstance().getTimeInMillis();
if (msglist.size() >= Math.round(limit
/ inputReq.getApplicationArea().getReqInfo().size())
|| (endTime - startTime) >= waitTime) {
log.info(
"Wait condition has been met... exiting the fetch loop. recordCount - {},
time exhausted - {} ms.",
msglist.size(), (endTime - startTime));
end = true;
} else if (set.sizeInBytes() == 0) {
emptyFetchCount++;
try {
if(emptyFetchCount == emptyLoopLimit) {
log.info("No messages were found in 3 successive fetches. Stopping the
consume process here.");
end = true;
} else {
Thread.sleep(1000);
}
} catch (InterruptedException ie) {
CONSUME_LOG.warn(ie.getMessage(), ie);
}
} else {
emptyFetchCount = 0;
}
req = null;
} while (!end);

*New kafkaconsumer implementation with new kafka brokers:*


2025-01-16 08:46:56,307 INFO
 (com.test.cr.kafka.controller.EventsController:99) [http-nio-8080-exec-3]
- Consume process begin...
2025-01-16 08:46:56,307 INFO
 (com.test.cr.kafka.services.NextgenConsumerDriver:76)
[http-nio-8080-exec-3] - Validating request...
2025-01-16 08:46:56,307 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:68) [http-nio-8080-exec-3] -
Consume messages Topic: ORG_CHANGE_EVENT_YS1_18_OCT_2024 & partition: 0
2025-01-16 08:46:56,307 INFO
 (com.test.cr.kafka.services.KafkaConsumerFactory:45)
[http-nio-8080-exec-3] - KafkaUser:app_client kafkaBrokerStr:
ys1kafka-aln-01.test.com:9092,ys1kafka-aln-02.cisco.com:9092,
ys1kafka-rdn-01.test.com:9092,ys1kafka-rdn-02.test.com:9092
GroupID:crt_org_change_events_lt
2025-01-16 08:46:56,308 INFO
 (org.apache.kafka.clients.consumer.ConsumerConfig:370)
[http-nio-8080-exec-3] - ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.include.jmx.reporter = true
        auto.offset.reset = none
        bootstrap.servers = [kafka-aln-01.test.com:9092,
kafka-aln-02.test.com:9092, kafka-rdn-01.test.com:9092,
kafka-rdn-02.test.com:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = crt_consumer_http-nio-8080-exec-3
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 10
        fetch.min.bytes = 1
        group.id =
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor, class
org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = PLAIN
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = SASL_SSL
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 45000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer

2025-01-16 08:46:56,308 WARN
 (org.apache.kafka.clients.consumer.KafkaConsumer:688)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Support for using the
empty group id by consumers is deprecated and will be removed in the next
major release.
2025-01-16 08:46:56,330 INFO
 (org.apache.kafka.common.utils.AppInfoParser:119) [http-nio-8080-exec-3] -
Kafka version: 3.6.1
2025-01-16 08:46:56,330 INFO
 (org.apache.kafka.common.utils.AppInfoParser:120) [http-nio-8080-exec-3] -
Kafka commitId: 5e3c2b738d253ff5
2025-01-16 08:46:56,330 INFO
 (org.apache.kafka.common.utils.AppInfoParser:121) [http-nio-8080-exec-3] -
Kafka startTimeMs: 1737046016330
2025-01-16 08:46:56,330 INFO
 (org.apache.kafka.clients.consumer.KafkaConsumer:1067)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Assigned to
partition(s): ORG_CHANGE_EVENT_YS1_18_OCT_2024-0
2025-01-16 08:46:56,330 INFO
 (org.apache.kafka.clients.consumer.KafkaConsumer:1067)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Assigned to
partition(s): ORG_CHANGE_EVENT_YS1_18_OCT_2024-0
2025-01-16 08:46:56,832 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:95) [http-nio-8080-exec-3] -
EarliestOffset :0 LatestOffset :16136566 Time :502 ms
2025-01-16 08:46:56,832 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:97) [http-nio-8080-exec-3] -
Latest & Earliest offset in Kafka for topic:
ORG_CHANGE_EVENT_YS1_18_OCT_2024 & partition: 0 Latest is 16136566 Earliest
is 0 and in user input is 16130077
2025-01-16 08:46:56,832 INFO
 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Seeking to offset
16130077 for partition ORG_CHANGE_EVENT_YS1_18_OCT_2024-0
2025-01-16 08:46:57,157 INFO  (org.apache.kafka.clients.Metadata:287)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Cluster ID:
WyVVmQEwTbeRp73DYJgk7Q
2025-01-16 08:46:58,741 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:46:58,823 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:46:58,898 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:46:58,964 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :188
2025-01-16 08:47:00,243 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:00,316 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:00,383 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:00,455 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :235
2025-01-16 08:47:01,735 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:01,799 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:01,966 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:02,035 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :294
2025-01-16 08:47:02,912 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:02,981 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:03,050 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :272
2025-01-16 08:47:05,087 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :0
2025-01-16 08:47:08,088 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :0
2025-01-16 08:47:11,088 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :0
2025-01-16 08:47:11,088 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:157) [http-nio-8080-exec-3]
- No messages were found in 3 successive fetches. Stopping the consume
process here.
2025-01-16 08:47:11,088 INFO
 (com.test.cr.kafka.services.TestKafkaConsumer:179) [http-nio-8080-exec-3]
- Summary ::: Total number of messages read from offset 16130077 to
16136566 are - 6489. Count of filtered results - 54
2025-01-16 08:47:11,088 INFO
 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1025)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Resetting generation
and member id due to: consumer pro-actively leaving the group
2025-01-16 08:47:11,089 INFO
 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Request joining group
due to: consumer pro-actively leaving the group
2025-01-16 08:47:11,138 INFO  (org.apache.kafka.common.metrics.Metrics:693)
[http-nio-8080-exec-3] - Metrics scheduler closed
2025-01-16 08:47:11,139 INFO  (org.apache.kafka.common.metrics.Metrics:697)
[http-nio-8080-exec-3] - Closing reporter
org.apache.kafka.common.metrics.JmxReporter
2025-01-16 08:47:11,139 INFO  (org.apache.kafka.common.metrics.Metrics:703)
[http-nio-8080-exec-3] - Metrics reporters closed
2025-01-16 08:47:11,140 INFO
 (org.apache.kafka.common.utils.AppInfoParser:83) [http-nio-8080-exec-3] -
App info kafka.consumer for crt_consumer_http-nio-8080-exec-3 unregistered
2025-01-16 08:47:11,159 INFO
 (com.cisco.cr.kafka.controller.EventsController:156)
[http-nio-8080-exec-3] - Consume process end.

New code:Here we are using KafkaConsumer API with SASL authentication

public List<JSONObject> consume(long offset, String topicName,int
partition,CEConsumeRequest inputReq) throws CustomException {
List<JSONObject> msglist = new ArrayList<JSONObject>();

int waitTime = Integer.valueOf(Config
.getConsumerPropValue("pull.wait.time"));
int limit = Integer.valueOf(Config.getConsumerPropValue("pull.size.limit"));
int emptyLoopLimit = Integer.valueOf(Config
.getConsumerPropValue("pull.empty.loop.limit"));
int fetchSize = Integer.valueOf(Config
.getConsumerPropValue("pull.each.fetch.size"));

log.info(
"Consume messages Topic: {} & partition: {}",
topicName, partition);

TopicPartition topicPartition = new TopicPartition(topicName, partition);

long readOffset = offset;
clientName = "crt_consumer_" + Thread.currentThread().getName();
   try (KafkaConsumer<String,String> consumer =
KafkaConsumerFactory.createConsumer(clientName,fetchSize,topicPartition)){
consumer.assign(Collections.singletonList(topicPartition));

         long OffsetStartTime = System.currentTimeMillis();
long OffsetEndTime= System.currentTimeMillis();
long kafkaEarliestOffset=0;
long latestOffset=0;
Admin adminClient = KafkaConsumerFactory.getAdminClient();
           try {
            kafkaEarliestOffset = adminClient.listOffsets(
               Map.of(topicPartition, OffsetSpec.earliest())
            ).all().get().get(topicPartition).offset();
            latestOffset = adminClient.listOffsets(
               Map.of(topicPartition, OffsetSpec.latest())
            ).all().get().get(topicPartition).offset();
OffsetEndTime = System.currentTimeMillis();
} catch (Exception e) {
e.printStackTrace();
}
           log.info("EarliestOffset :{} LatestOffset :{} Time :{}
ms",kafkaEarliestOffset,latestOffset,(OffsetEndTime - OffsetStartTime));

           log.info(
    "Latest & Earliest offset in Kafka for topic: {} & partition: {} Latest
is {} Earliest is {} and in user input is {}",
    topicName, partition, latestOffset, kafkaEarliestOffset,readOffset);

if (readOffset == 0 && readOffset < kafkaEarliestOffset) {
log.warn("Resetting the offset to earliest available offset in kafka.");
readOffset = kafkaEarliestOffset;
}

boolean end = false;
long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis();
int emptyFetchCount = 0;
consumer.seek(topicPartition, offset);
do {
JSONObject obj = null;
ConsumerRecords < String, String > records =
consumer.poll(Duration.ofMillis(2000));
log.info("Poll Records Count :{}",records.count());
 for (ConsumerRecord< String, String > consumerRecord: records) {
 long currentOffset = consumerRecord.offset();
 if (currentOffset < readOffset) {
                   log.warn("Found an old offset: {}, Expecting: {}",
currentOffset, readOffset);
                   continue;
               }
 String message = consumerRecord.value();
 log.debug(
"client name : {} , Offset is : {} , Message is : {} ",
clientName, readOffset,
message);
 CONSUME_LOG.debug(topicName + "\t" + partition + "\t"
+ String.valueOf(currentOffset));
 obj = new JSONObject(message);
if (!inputReq.getApplicationArea().isSubscription()
|| (obj.has("SUBSCRIBER_NAMES") && obj.getString(
"SUBSCRIBER_NAMES").contains(
inputReq.getSender().getReferenceID()))) {
if (inputReq.getApplicationArea().getChangeEventsType() == null
|| (obj.has("NOTIFICATION_EVENT_NAME") && inputReq
.getApplicationArea()
.getChangeEventsType()
.contains(
obj.getString("NOTIFICATION_EVENT_NAME")))) {
msglist.add(obj);
}
}
readOffset = currentOffset + 1;
 }
endTime = System.currentTimeMillis();
if (msglist.size() >= Math.round(limit
/ inputReq.getApplicationArea().getReqInfo().size())
|| (endTime - startTime) >= waitTime) {
log.info(
"Wait condition has been met... exiting the fetch loop. recordCount - {},
time exhausted - {} ms.",
msglist.size(), (endTime - startTime));
end = true;
//consumer.commitSync();
} else if (records.isEmpty()) {
emptyFetchCount++;
try {
if(emptyFetchCount == emptyLoopLimit) {
log.info("No messages were found in 3 successive fetches. Stopping the
consume process here.");
end = true;
} else {
Thread.sleep(1000);
}
} catch (InterruptedException ie) {
CONSUME_LOG.warn(ie.getMessage(), ie);
}
} else {
emptyFetchCount = 0;
}
} while (!end);


The new kafka consumer API is very slow..i have tried lot of settings like
increasing MAX_POLL_RECORDS_CONFIG,FETCH_MIN_BYTES_CONFIG etc..but no use.

Is this time expected with the new KafkaConsumer API.How to improve my
timings.

please help on this.

Regards,
Giri

Reply via email to