The old implementation is taking upto 2.5 seconds.
new  implementation is taking 10 seconds (10 polls for each poll have 500
records returning)

Authentication happens every time internally?is this delay expected? How to
explain this to management please?



On Thu, Jan 16, 2025 at 10:42 PM giri mungi <girimung...@gmail.com> wrote:

> Hi all,
>
>
> The below are the two implementation logs : 1) old kafka broker with old
> kafka client version (0.8 etc)
> 2) new kafka brokers (3.6.1) with new kafka client version (3.6.1)  .
>
> The old does not have authentication but new kafka we are using
> authentication..we went to new one for security and robustness.
>
>
>
> *Old kafka brokers with old kafka api implementation: Here we are using *
>
> 2025-01-16 08:46:12,380 INFO
>  (com.cisco.cr.kafka.controller.EventsController:98) [http-nio-8080-exec-8]
> - Consume process begin...
> 2025-01-16 08:46:12,380 INFO
>  (com.test.cr.kafka.services.NextgenConsumerDriver:82)
> [http-nio-8080-exec-8] - Validating request...
> 2025-01-16 08:46:12,387 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-8]
> - Latest offset in Kafka for topic: crt_org_ce_ts1crt & partition: 0 is
> 16136630 and in user input is 16130077
> 2025-01-16 08:46:12,387 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:117) [http-nio-8080-exec-8]
> - Earliest offset in Kafka for topic: crt_org_ce_ts1crt & partition: 0 is
> 16129521 and in user input is 16130077
> 2025-01-16 08:46:12,415 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16130077 - 1043773 KB.
> 2025-01-16 08:46:12,502 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16130372 - 1043496 KB.
> 2025-01-16 08:46:12,543 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16130670 - 1044428 KB.
> 2025-01-16 08:46:12,587 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16130968 - 1044939 KB.
> 2025-01-16 08:46:12,622 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16131265 - 1045057 KB.
> 2025-01-16 08:46:12,650 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16131567 - 1042968 KB.
> 2025-01-16 08:46:12,676 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16131863 - 1043547 KB.
> 2025-01-16 08:46:12,698 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16132161 - 1045497 KB.
> 2025-01-16 08:46:12,723 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16132459 - 1043030 KB.
> 2025-01-16 08:46:12,746 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16132755 - 1045957 KB.
> 2025-01-16 08:46:12,772 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16133054 - 1043704 KB.
> 2025-01-16 08:46:12,789 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16133350 - 533178 KB.
> 2025-01-16 08:46:12,805 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16133499 - 1042816 KB.
> 2025-01-16 08:46:12,832 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16133797 - 1045895 KB.
> 2025-01-16 08:46:12,859 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16134093 - 1044584 KB.
> 2025-01-16 08:46:12,884 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16134398 - 1045775 KB.
> 2025-01-16 08:46:12,906 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16134702 - 1043018 KB.
> 2025-01-16 08:46:12,930 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16134999 - 1045228 KB.
> 2025-01-16 08:46:12,952 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16135290 - 1044921 KB.
> 2025-01-16 08:46:12,979 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16135579 - 1046197 KB.
> 2025-01-16 08:46:13,002 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16135876 - 1042978 KB.
> 2025-01-16 08:46:13,029 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16136172 - 1045985 KB.
> 2025-01-16 08:46:13,050 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16136463 - 600146 KB.
> 2025-01-16 08:46:13,057 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16136630 - 0 KB.
> 2025-01-16 08:46:14,059 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16136630 - 0 KB.
> 2025-01-16 08:46:15,061 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
> - Total size of messages read from offset 16136630 - 0 KB.
> 2025-01-16 08:46:15,061 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:221) [http-nio-8080-exec-8]
> - No messages were found in 3 successive fetches. Stopping the consume
> process here.
> 2025-01-16 08:46:15,061 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:242) [http-nio-8080-exec-8]
> - Closing consumer... crt_consumer_crt_org_ce_ts1crt_0_Webex-Webex
> 2025-01-16 08:46:15,061 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:249) [http-nio-8080-exec-8]
> - Summary ::: Total number of messages read from offset 16130077 to
> 16136630 are - 6553. Count of filtered results - 54
> 2025-01-16 08:46:15,081 INFO
>  (com.cisco.cr.kafka.controller.EventsController:154)
> [http-nio-8080-exec-8] - Consume process end.
>
> old code: In old code we were using
> FetchRequest,FetchResponse,SimpleConsumer without authentication and this
> is low level API
>
>
> public class ChangeEventsKafkaConsumer{
>
>
> private CEConsumeResponse ceResponse;
>
> public ChangeEventsKafkaConsumer() {
> super();
> }
>
> public ChangeEventsKafkaConsumer(ConfigBean confBean,
> CEConsumeResponse res, long offset) {
> super(confBean);
> this.ceResponse = res;
> this.offset = offset;
> }
>
> @Override
> public long readMessages(long a_maxRead, String a_topic, int a_partition,
> List<String> ng_replicaBrokers, int a_port) throws CustomException {
> return 0l;
> }
>
> public List<JSONObject> consume(long offset, String a_topic,
> int a_partition, List<String> ng_replicaBrokers, int a_port,
> CEConsumeRequest inputReq) throws CustomException {
> List<JSONObject> msglist = new ArrayList<JSONObject>();
>
> int waitTime = Integer.valueOf(Config
> .getConsumerPropValue("pull.wait.time"));
> int limit =
> Integer.valueOf(Config.getConsumerPropValue("pull.size.limit"));
> int fetchSize = Integer.valueOf(Config
> .getConsumerPropValue("pull.each.fetch.size"));
> int emptyLoopLimit = Integer.valueOf(Config
> .getConsumerPropValue("pull.empty.loop.limit"));
>
> /*
> * Fetching Metadata.
> */
> PartitionMetadata metadata = findLeader(ng_replicaBrokers, a_port,
> a_topic, a_partition);
> if (metadata == null) {
> log.error("Can't find metadata for Topic and Partition.");
> throw new CustomException(
> "Can't find metadata for Topic and Partition.");
> }
> if (metadata.leader() == null) {
> log.error("Can't find Leader for Topic and Partition.");
> throw new CustomException(
> "Can't find Leader for Topic and Partition.");
> }
>
> /*
> * Getting lead broker.
> */
> String leadBroker = metadata.leader().host();
> log.debug("Lead broker for Partion : {}, Topic :{} is {}", a_partition,
> a_topic, leadBroker);
> SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
> configBean.getConnectionTimeOut(),
> configBean.getKafkaBufferSize(), clientName);
>
> /*
> * Fetching kafka offset from Kafka
> */
> long kafkaOffset = getLastOffset(consumer, a_topic, a_partition,
> kafka.api.OffsetRequest.LatestTime(), clientName);
>
> /*
> * Getting earliest available offset in kafka.
> */
> long kafkaEarliestOffset = getLastOffset(consumer, a_topic,
> a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
>
> /*
> * User input offset.
> */
> long readOffset = offset;
>
> log.info(
> "Latest offset in Kafka for topic: {} & partition: {} is {} and in user
> input is {}",
> a_topic, a_partition, kafkaOffset, readOffset);
> log.info(
> "Earliest offset in Kafka for topic: {} & partition: {} is {} and in user
> input is {}",
> a_topic, a_partition, kafkaEarliestOffset, readOffset);
>
> if (readOffset == 0 && readOffset < kafkaEarliestOffset) {
> log.warn("Resetting the offset to earliest available offset in kafka.");
> readOffset = kafkaEarliestOffset;
> }
>
> boolean end = false;
> long startTime = Calendar.getInstance().getTimeInMillis();
> long endTime = Calendar.getInstance().getTimeInMillis();
> int emptyFetchCount = 0;
> do {
> FetchRequest req = new FetchRequestBuilder().clientId(clientName)
> .addFetch(a_topic, a_partition, readOffset, fetchSize)
> .build();
>
> FetchResponse fetchResponse = consumer.fetch(req);
>
>
>
> /*
> * Reading data from kafka response.
> */
> JSONObject obj = null;
> ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
> a_partition);
> log.info("Total size of messages read from offset {} - {} KB.",
> readOffset, set.sizeInBytes());
> for (MessageAndOffset messageAndOffset : set) {
> String message = null;
> if (messageAndOffset.offset() < readOffset) {
> log.warn("Found an old offset: {} ,Expecting: {}",
> messageAndOffset.offset(), readOffset);
> continue;
> }
>
> ByteBuffer payload = messageAndOffset.message().payload();
>
> byte[] bytes = new byte[payload.limit()];
> payload.get(bytes);
> try {
> message = new String(bytes, "UTF-8");
> } catch (UnsupportedEncodingException ue) {
> log.warn(ue.getMessage(), ue);
> message = new String(bytes);
> }
> log.debug(
> "client name : {} , Offset is : {} , Message is : {} ",
> clientName, String.valueOf(messageAndOffset.offset()),
> message);
> CONSUME_LOG.debug(a_topic + "\t" + a_partition + "\t"
> + String.valueOf(messageAndOffset.offset()));
> obj = new JSONObject(message);
> if (!inputReq.getApplicationArea().isSubscription()
> || (obj.has("SUBSCRIBER_NAMES") && obj.getString(
> "SUBSCRIBER_NAMES").contains(
> inputReq.getSender().getReferenceID()))) {
> if (inputReq.getApplicationArea().getChangeEventsType() == null
> || (obj.has("NOTIFICATION_EVENT_NAME") && inputReq
> .getApplicationArea()
> .getChangeEventsType()
> .contains(
> obj.getString("NOTIFICATION_EVENT_NAME")))) {
> msglist.add(new JSONObject(message));
> }
> }
> readOffset = messageAndOffset.nextOffset();
> }
> endTime = Calendar.getInstance().getTimeInMillis();
> if (msglist.size() >= Math.round(limit
> / inputReq.getApplicationArea().getReqInfo().size())
> || (endTime - startTime) >= waitTime) {
> log.info(
> "Wait condition has been met... exiting the fetch loop. recordCount - {},
> time exhausted - {} ms.",
> msglist.size(), (endTime - startTime));
> end = true;
> } else if (set.sizeInBytes() == 0) {
> emptyFetchCount++;
> try {
> if(emptyFetchCount == emptyLoopLimit) {
> log.info("No messages were found in 3 successive fetches. Stopping the
> consume process here.");
> end = true;
> } else {
> Thread.sleep(1000);
> }
> } catch (InterruptedException ie) {
> CONSUME_LOG.warn(ie.getMessage(), ie);
> }
> } else {
> emptyFetchCount = 0;
> }
> req = null;
> } while (!end);
>
> *New kafkaconsumer implementation with new kafka brokers:*
>
>
> 2025-01-16 08:46:56,307 INFO
>  (com.test.cr.kafka.controller.EventsController:99) [http-nio-8080-exec-3]
> - Consume process begin...
> 2025-01-16 08:46:56,307 INFO
>  (com.test.cr.kafka.services.NextgenConsumerDriver:76)
> [http-nio-8080-exec-3] - Validating request...
> 2025-01-16 08:46:56,307 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:68) [http-nio-8080-exec-3] -
> Consume messages Topic: ORG_CHANGE_EVENT_YS1_18_OCT_2024 & partition: 0
> 2025-01-16 08:46:56,307 INFO
>  (com.test.cr.kafka.services.KafkaConsumerFactory:45)
> [http-nio-8080-exec-3] - KafkaUser:app_client kafkaBrokerStr:
> ys1kafka-aln-01.test.com:9092,ys1kafka-aln-02.cisco.com:9092,
> ys1kafka-rdn-01.test.com:9092,ys1kafka-rdn-02.test.com:9092
> GroupID:crt_org_change_events_lt
> 2025-01-16 08:46:56,308 INFO
>  (org.apache.kafka.clients.consumer.ConsumerConfig:370)
> [http-nio-8080-exec-3] - ConsumerConfig values:
>         allow.auto.create.topics = true
>         auto.commit.interval.ms = 5000
>         auto.include.jmx.reporter = true
>         auto.offset.reset = none
>         bootstrap.servers = [kafka-aln-01.test.com:9092,
> kafka-aln-02.test.com:9092, kafka-rdn-01.test.com:9092,
> kafka-rdn-02.test.com:9092]
>         check.crcs = true
>         client.dns.lookup = use_all_dns_ips
>         client.id = crt_consumer_http-nio-8080-exec-3
>         client.rack =
>         connections.max.idle.ms = 540000
>         default.api.timeout.ms = 60000
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 10
>         fetch.min.bytes = 1
>         group.id =
>         group.instance.id = null
>         heartbeat.interval.ms = 3000
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         internal.throw.on.fetch.stable.offset.unsupported = false
>         isolation.level = read_uncommitted
>         key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 300000
>         max.poll.records = 500
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor, class
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 30000
>         retry.backoff.ms = 100
>         sasl.client.callback.handler.class = null
>         sasl.jaas.config = [hidden]
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.login.callback.handler.class = null
>         sasl.login.class = null
>         sasl.login.connect.timeout.ms = null
>         sasl.login.read.timeout.ms = null
>         sasl.login.refresh.buffer.seconds = 300
>         sasl.login.refresh.min.period.seconds = 60
>         sasl.login.refresh.window.factor = 0.8
>         sasl.login.refresh.window.jitter = 0.05
>         sasl.login.retry.backoff.max.ms = 10000
>         sasl.login.retry.backoff.ms = 100
>         sasl.mechanism = PLAIN
>         sasl.oauthbearer.clock.skew.seconds = 30
>         sasl.oauthbearer.expected.audience = null
>         sasl.oauthbearer.expected.issuer = null
>         sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>         sasl.oauthbearer.jwks.endpoint.url = null
>         sasl.oauthbearer.scope.claim.name = scope
>         sasl.oauthbearer.sub.claim.name = sub
>         sasl.oauthbearer.token.endpoint.url = null
>         security.protocol = SASL_SSL
>         security.providers = null
>         send.buffer.bytes = 131072
>         session.timeout.ms = 45000
>         socket.connection.setup.timeout.max.ms = 30000
>         socket.connection.setup.timeout.ms = 10000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>         ssl.endpoint.identification.algorithm = https
>         ssl.engine.factory.class = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.certificate.chain = null
>         ssl.keystore.key = null
>         ssl.keystore.location = null
>         ssl.keystore.password = null
>         ssl.keystore.type = JKS
>         ssl.protocol = TLSv1.3
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.certificates = null
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         value.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
>
> 2025-01-16 08:46:56,308 WARN
>  (org.apache.kafka.clients.consumer.KafkaConsumer:688)
> [http-nio-8080-exec-3] - [Consumer
> clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Support for using the
> empty group id by consumers is deprecated and will be removed in the next
> major release.
> 2025-01-16 08:46:56,330 INFO
>  (org.apache.kafka.common.utils.AppInfoParser:119) [http-nio-8080-exec-3] -
> Kafka version: 3.6.1
> 2025-01-16 08:46:56,330 INFO
>  (org.apache.kafka.common.utils.AppInfoParser:120) [http-nio-8080-exec-3] -
> Kafka commitId: 5e3c2b738d253ff5
> 2025-01-16 08:46:56,330 INFO
>  (org.apache.kafka.common.utils.AppInfoParser:121) [http-nio-8080-exec-3] -
> Kafka startTimeMs: 1737046016330
> 2025-01-16 08:46:56,330 INFO
>  (org.apache.kafka.clients.consumer.KafkaConsumer:1067)
> [http-nio-8080-exec-3] - [Consumer
> clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Assigned to
> partition(s): ORG_CHANGE_EVENT_YS1_18_OCT_2024-0
> 2025-01-16 08:46:56,330 INFO
>  (org.apache.kafka.clients.consumer.KafkaConsumer:1067)
> [http-nio-8080-exec-3] - [Consumer
> clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Assigned to
> partition(s): ORG_CHANGE_EVENT_YS1_18_OCT_2024-0
> 2025-01-16 08:46:56,832 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:95) [http-nio-8080-exec-3] -
> EarliestOffset :0 LatestOffset :16136566 Time :502 ms
> 2025-01-16 08:46:56,832 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:97) [http-nio-8080-exec-3] -
> Latest & Earliest offset in Kafka for topic:
> ORG_CHANGE_EVENT_YS1_18_OCT_2024 & partition: 0 Latest is 16136566 Earliest
> is 0 and in user input is 16130077
> 2025-01-16 08:46:56,832 INFO
>  (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
> [http-nio-8080-exec-3] - [Consumer
> clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Seeking to offset
> 16130077 for partition ORG_CHANGE_EVENT_YS1_18_OCT_2024-0
> 2025-01-16 08:46:57,157 INFO  (org.apache.kafka.clients.Metadata:287)
> [http-nio-8080-exec-3] - [Consumer
> clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Cluster ID:
> WyVVmQEwTbeRp73DYJgk7Q
> 2025-01-16 08:46:58,741 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:46:58,823 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:46:58,898 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:46:58,964 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :188
> 2025-01-16 08:47:00,243 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:47:00,316 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:47:00,383 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:47:00,455 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :235
> 2025-01-16 08:47:01,735 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:47:01,799 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:47:01,966 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:47:02,035 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :294
> 2025-01-16 08:47:02,912 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:47:02,981 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :500
> 2025-01-16 08:47:03,050 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :272
> 2025-01-16 08:47:05,087 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :0
> 2025-01-16 08:47:08,088 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :0
> 2025-01-16 08:47:11,088 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
> - Poll Records Count :0
> 2025-01-16 08:47:11,088 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:157) [http-nio-8080-exec-3]
> - No messages were found in 3 successive fetches. Stopping the consume
> process here.
> 2025-01-16 08:47:11,088 INFO
>  (com.test.cr.kafka.services.TestKafkaConsumer:179) [http-nio-8080-exec-3]
> - Summary ::: Total number of messages read from offset 16130077 to
> 16136566 are - 6489. Count of filtered results - 54
> 2025-01-16 08:47:11,088 INFO
>  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1025)
> [http-nio-8080-exec-3] - [Consumer
> clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Resetting generation
> and member id due to: consumer pro-actively leaving the group
> 2025-01-16 08:47:11,089 INFO
>  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
> [http-nio-8080-exec-3] - [Consumer
> clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Request joining group
> due to: consumer pro-actively leaving the group
> 2025-01-16 08:47:11,138 INFO
>  (org.apache.kafka.common.metrics.Metrics:693) [http-nio-8080-exec-3] -
> Metrics scheduler closed
> 2025-01-16 08:47:11,139 INFO
>  (org.apache.kafka.common.metrics.Metrics:697) [http-nio-8080-exec-3] -
> Closing reporter org.apache.kafka.common.metrics.JmxReporter
> 2025-01-16 08:47:11,139 INFO
>  (org.apache.kafka.common.metrics.Metrics:703) [http-nio-8080-exec-3] -
> Metrics reporters closed
> 2025-01-16 08:47:11,140 INFO
>  (org.apache.kafka.common.utils.AppInfoParser:83) [http-nio-8080-exec-3] -
> App info kafka.consumer for crt_consumer_http-nio-8080-exec-3 unregistered
> 2025-01-16 08:47:11,159 INFO
>  (com.cisco.cr.kafka.controller.EventsController:156)
> [http-nio-8080-exec-3] - Consume process end.
>
> New code:Here we are using KafkaConsumer API with SASL authentication
>
> public List<JSONObject> consume(long offset, String topicName,int
> partition,CEConsumeRequest inputReq) throws CustomException {
> List<JSONObject> msglist = new ArrayList<JSONObject>();
>
> int waitTime = Integer.valueOf(Config
> .getConsumerPropValue("pull.wait.time"));
> int limit =
> Integer.valueOf(Config.getConsumerPropValue("pull.size.limit"));
> int emptyLoopLimit = Integer.valueOf(Config
> .getConsumerPropValue("pull.empty.loop.limit"));
> int fetchSize = Integer.valueOf(Config
> .getConsumerPropValue("pull.each.fetch.size"));
>
> log.info(
> "Consume messages Topic: {} & partition: {}",
> topicName, partition);
>
> TopicPartition topicPartition = new TopicPartition(topicName, partition);
>
> long readOffset = offset;
> clientName = "crt_consumer_" + Thread.currentThread().getName();
>    try (KafkaConsumer<String,String> consumer =
> KafkaConsumerFactory.createConsumer(clientName,fetchSize,topicPartition)){
> consumer.assign(Collections.singletonList(topicPartition));
>
>          long OffsetStartTime = System.currentTimeMillis();
> long OffsetEndTime= System.currentTimeMillis();
> long kafkaEarliestOffset=0;
> long latestOffset=0;
> Admin adminClient = KafkaConsumerFactory.getAdminClient();
>            try {
>             kafkaEarliestOffset = adminClient.listOffsets(
>                Map.of(topicPartition, OffsetSpec.earliest())
>             ).all().get().get(topicPartition).offset();
>             latestOffset = adminClient.listOffsets(
>                Map.of(topicPartition, OffsetSpec.latest())
>             ).all().get().get(topicPartition).offset();
> OffsetEndTime = System.currentTimeMillis();
> } catch (Exception e) {
> e.printStackTrace();
> }
>            log.info("EarliestOffset :{} LatestOffset :{} Time :{}
> ms",kafkaEarliestOffset,latestOffset,(OffsetEndTime - OffsetStartTime));
>
>            log.info(
>     "Latest & Earliest offset in Kafka for topic: {} & partition: {}
> Latest is {} Earliest is {} and in user input is {}",
>     topicName, partition, latestOffset, kafkaEarliestOffset,readOffset);
>
> if (readOffset == 0 && readOffset < kafkaEarliestOffset) {
> log.warn("Resetting the offset to earliest available offset in kafka.");
> readOffset = kafkaEarliestOffset;
> }
>
> boolean end = false;
> long startTime = System.currentTimeMillis();
> long endTime = System.currentTimeMillis();
> int emptyFetchCount = 0;
> consumer.seek(topicPartition, offset);
> do {
> JSONObject obj = null;
> ConsumerRecords < String, String > records =
> consumer.poll(Duration.ofMillis(2000));
> log.info("Poll Records Count :{}",records.count());
>  for (ConsumerRecord< String, String > consumerRecord: records) {
>  long currentOffset = consumerRecord.offset();
>  if (currentOffset < readOffset) {
>                    log.warn("Found an old offset: {}, Expecting: {}",
> currentOffset, readOffset);
>                    continue;
>                }
>  String message = consumerRecord.value();
>  log.debug(
> "client name : {} , Offset is : {} , Message is : {} ",
> clientName, readOffset,
> message);
>  CONSUME_LOG.debug(topicName + "\t" + partition + "\t"
> + String.valueOf(currentOffset));
>  obj = new JSONObject(message);
> if (!inputReq.getApplicationArea().isSubscription()
> || (obj.has("SUBSCRIBER_NAMES") && obj.getString(
> "SUBSCRIBER_NAMES").contains(
> inputReq.getSender().getReferenceID()))) {
> if (inputReq.getApplicationArea().getChangeEventsType() == null
> || (obj.has("NOTIFICATION_EVENT_NAME") && inputReq
> .getApplicationArea()
> .getChangeEventsType()
> .contains(
> obj.getString("NOTIFICATION_EVENT_NAME")))) {
> msglist.add(obj);
> }
> }
> readOffset = currentOffset + 1;
>  }
> endTime = System.currentTimeMillis();
> if (msglist.size() >= Math.round(limit
> / inputReq.getApplicationArea().getReqInfo().size())
> || (endTime - startTime) >= waitTime) {
> log.info(
> "Wait condition has been met... exiting the fetch loop. recordCount - {},
> time exhausted - {} ms.",
> msglist.size(), (endTime - startTime));
> end = true;
> //consumer.commitSync();
> } else if (records.isEmpty()) {
> emptyFetchCount++;
> try {
> if(emptyFetchCount == emptyLoopLimit) {
> log.info("No messages were found in 3 successive fetches. Stopping the
> consume process here.");
> end = true;
> } else {
> Thread.sleep(1000);
> }
> } catch (InterruptedException ie) {
> CONSUME_LOG.warn(ie.getMessage(), ie);
> }
> } else {
> emptyFetchCount = 0;
> }
> } while (!end);
>
>
> The new kafka consumer API is very slow..i have tried lot of settings like
> increasing MAX_POLL_RECORDS_CONFIG,FETCH_MIN_BYTES_CONFIG etc..but no use.
>
> Is this time expected with the new KafkaConsumer API.How to improve my
> timings.
>
> please help on this.
>
> Regards,
> Giri
>

Reply via email to