Hi All,

I am trying to make a hello world example for Transactional Producer and trying 
to consume.
I am doing all this in plain java.

I can produce but consumer is not consuming message.

I searched over other places and I found some people have same problem.

Right now, I am using single broker. I tried same with 3 brokers also and it 
was not working at that time also.

I don’t know what I am missing and where… :p in Consumer I am missing something 
or in producer.

I have attached Producer and Consumer codes and console logs with my broker logs

Thanks,
Abhishek


My Broker logs after producing messages
<LOGS broker>
[2017-11-01 18:45:55,000] INFO Updated PartitionLeaderEpoch. New: {epoch:4, 
offset:3}, Current: {epoch:3, offset0} for Partition: __transaction_state-2. 
Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-11-01 18:46:03,482] INFO [Transaction Coordinator 1001]: Initialized 
transactionalId TXN_ID:0.5031925219291776-156417066 with producerId 4001 and 
producer epoch 0 on partition __transaction_state-2 
(kafka.coordinator.transaction.TransactionCoordinator)
</LOGS broker>

My producer code is
<CODE producer>
import com.example.transaction.producer.utils.DataObject;
import com.example.transaction.producer.utils.serde.JsonSerializer;
import kafka.common.AuthorizationException;
import kafka.common.KafkaException;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

import java.util.*;

public class SampleProducer {

    public static String topic = "topic-4";

    public static void main(String[] args) {

        Properties configProperties = new Properties();

        //configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, 
"some-client-id");
        configProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TXN_ID:" 
+ new Random().nextDouble() + new Random().nextInt());
        configProperties.put("acks", "all");
        configProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProperties.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        configProperties.put("key.serializer", 
"org.apache.kafka.common.serialization.IntegerSerializer");
        configProperties.put("value.serializer", JsonSerializer.class);
        configProperties.put("bootstrap.servers", 
"192.168.41.132:9090<http://192.168.41.132:9090>");


        KafkaProducer<Integer, DataObject>producer = new 
KafkaProducer<>(configProperties);

        System.out.println("Init Transaction");
        producer.initTransactions();
        try {

            System.out.println("transaction initialised going to begin 
transaction");
            producer.beginTransaction();
            System.out.println("Transaction started");

            ProducerRecord rec = new ProducerRecord(topic, 5, new DataObject(5, 
"Hello, World!"));

            RecordMetadata metadata = (RecordMetadata) producer.send(rec).get();
            System.out.println("The offset of the record we just sent is: " + 
metadata.offset());

            metadata = (RecordMetadata) producer.send(rec).get();
            System.out.println("The offset of the record we just sent is: " + 
metadata.offset());

            producer.commitTransaction();
            System.out.println("Transaction Committed");

        }catch (ProducerFencedException | OutOfOrderSequenceException | 
AuthorizationException e){
            // We can't recover from these exceptions, so our only option is to 
close the producer and exit.
            System.out.println("Connection closed but commit failed. We can't 
recover");
            producer.close();
        }catch(KafkaException e) {
            // For all other exceptions, just abort the transaction and try 
again.
            System.out.println("Abort Transaction");
            producer.abortTransaction();
        }catch (Exception x){}
        producer.close();
        System.out.println("Closed");
    }
}


</CODE proucer>

These are my producer console logs

<LOGS producer>
0    [main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - 
ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [192.168.41.132:9090<http://192.168.41.132:9090>]
buffer.memory = 33554432
client.id<http://client.id> =
compression.type = none
connections.max.idle.ms<http://connections.max.idle.ms> = 540000
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
linger.ms<http://linger.ms> = 0
max.block.ms<http://max.block.ms> = 60000
max.in<http://max.in>.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms<http://metadata.max.age.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 30000
retries = 2147483647
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms<http://transaction.timeout.ms> = 60000
transactional.id<http://transactional.id> = TXN_ID:0.5031925219291776-156417066
value.serializer = class 
com.example.transaction.producer.utils.serde.JsonSerializer

261  [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - 
Instantiated a transactional producer.
265  [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Overriding 
the default max.in<http://max.in>.flight.requests.per.connection to 1 since 
idempontence is enabled.
274  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name bufferpool-wait-time
281  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name buffer-exhausted-records
284  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata 
version 1 to Cluster(id = null, nodes = 
[192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)], 
partitions = [])
297  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name produce-throttle-time
564  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name connections-closed:
564  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name connections-created:
565  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name bytes-sent-received:
565  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name bytes-sent:
565  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name bytes-received:
566  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name select-time:
567  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name io-time:
573  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name batch-size
574  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name compression-rate
574  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name queue-time
575  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name request-time
575  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name records-per-request
577  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name record-retries
577  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name errors
577  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name record-size-max
579  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name batch-split-rate
582  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender  - Starting Kafka producer 
I/O thread.
585  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version 
: 0.11.0.0
585  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId 
: cb8625948210849f
586  [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer  - Kafka 
producer started
Init Transaction
588  [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state 
UNINITIALIZED to INITIALIZING
588  [main] INFO  
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] ProducerId set to -1 with 
epoch -1
594  [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional 
request (type=InitProducerIdRequest, 
transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000)
598  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional 
request (type=FindCoordinatorRequest, 
coordinatorKey=TXN_ID:0.5031925219291776-156417066, coordinatorType=TRANSACTION)
598  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional 
request (type=InitProducerIdRequest, 
transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000)
712  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient  - Initiating connection to node -1 at 
192.168.41.132:9090<http://192.168.41.132:9090>.
712  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
node--1.bytes-sent
712  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
node--1.bytes-received
712  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
node--1.latency
712  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector
  - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 
to node -1
712  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient  - Completed connection to node -1.  
Fetching API versions.
712  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from 
node -1.
897  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient  - Recorded API versions for node -1: 
(Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 
2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], 
StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], 
ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], 
OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], 
JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], 
LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], 
DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], 
SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], 
CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], 
DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], 
OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], 
AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], 
WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], 
DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 
0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 
0])
898  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId 
TXN_ID:0.5031925219291776-156417066] Sending transactional request 
(type=FindCoordinatorRequest, 
coordinatorKey=TXN_ID:0.5031925219291776-156417066, 
coordinatorType=TRANSACTION) to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)
901  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient  - Initiating connection to node 1001 at 
192.168.41.132:9090<http://192.168.41.132:9090>.
901  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
node-1001.bytes-sent
902  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
node-1001.bytes-received
902  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
node-1001.latency
903  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector
  - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 
to node 1001
903  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient  - Completed connection to node 1001.  
Fetching API versions.
903  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from 
node 1001.
905  [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient  - Recorded API versions for node 1001: 
(Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 
2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], 
StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], 
ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], 
OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], 
JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], 
LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], 
DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], 
SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], 
CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], 
DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], 
OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], 
AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], 
WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], 
DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 
0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 
0])
1009 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId 
TXN_ID:0.5031925219291776-156417066] Sending transactional request 
(type=InitProducerIdRequest, 
transactionalId=TXN_ID:0.5031925219291776-156417066, 
transactionTimeoutMs=60000) to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9491 [kafka-producer-network-thread | producer-1] INFO  
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] ProducerId set to 4001 
with epoch 0
9491 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state 
INITIALIZING to READY
transaction initialised going to begin transaction
9491 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state 
READY to IN_TRANSACTION
Transaction started
9491 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.NetworkClient  - Sending metadata request 
(type=MetadataRequest, topics=topic-4) to node -1
9491 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to 
Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = 
[192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)], 
partitions = [Partition(topic = topic-4, partition = 0, leader = 1001, replicas 
= [1001], isr = [1001])])
9523 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Begin adding new 
partition topic-4-0 to transaction
9523 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional 
request (type=AddPartitionsToTxnRequest, 
transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, 
producerEpoch=0, partitions=[topic-4-0])
9523 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId 
TXN_ID:0.5031925219291776-156417066] Sending transactional request 
(type=AddPartitionsToTxnRequest, 
transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, 
producerEpoch=0, partitions=[topic-4-0]) to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9538 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Successfully added 
partitions [topic-4-0] to transaction
9538 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning 
sequence number 0 from producer (producerId=4001, epoch=0) to dequeued batch 
from partition topic-4-0 bound for 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null).
9538 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
topic.topic-4.records-per-batch
9538 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
topic.topic-4.bytes
9538 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
topic.topic-4.compression-rate
9538 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
topic.topic-4.record-retries
9538 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Added sensor with name 
topic.topic-4.record-errors
9538 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence 
number for topic-partition topic-4-0 to 1
The offset of the record we just sent is: 5
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning 
sequence number 1 from producer (producerId=4001, epoch=0) to dequeued batch 
from partition topic-4-0 bound for 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null).
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence 
number for topic-partition topic-4-0 to 2
The offset of the record we just sent is: 6
9554 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state 
IN_TRANSACTION to COMMITTING_TRANSACTION
9554 [main] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional 
request (type=EndTxnRequest, 
transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, 
producerEpoch=0, result=COMMIT)
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId 
TXN_ID:0.5031925219291776-156417066] Sending transactional request 
(type=EndTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, 
producerId=4001, producerEpoch=0, result=COMMIT) to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.TransactionManager  - 
[TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state 
COMMITTING_TRANSACTION to READY
Transaction Committed
9554 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Closing 
the Kafka producer with timeoutMillis = 9223372036854775807 ms.
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender  - Beginning shutdown of 
Kafka producer I/O thread, sending remaining records.
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
connections-closed:
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
connections-created:
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
bytes-sent-received:
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent:
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
bytes-received:
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name select-time:
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name io-time:
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
node--1.bytes-sent
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
node--1.bytes-received
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
node--1.latency
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
node-1001.bytes-sent
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
node-1001.bytes-received
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.common.metrics.Metrics  - Removed sensor with name 
node-1001.latency
9554 [kafka-producer-network-thread | producer-1] DEBUG 
org.apache.kafka.clients.producer.internals.Sender  - Shutdown of Kafka 
producer I/O thread has completed.
9554 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer  - The Kafka 
producer has closed.
Closed

</LOGS producer console>



My consumer side code.

<CODE consumer>
package com.example.transaction.producer.consumer;

import com.example.transaction.producer.utils.DataObject;
import com.example.transaction.producer.utils.serde.JsonDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Properties<http://java.util.Properties>;


public class Consumer {

    private static final Logger LOGGER = 
LoggerFactory.getLogger(Consumer.class);
    public static String topic = "topic-4";

    public static void main(String[] args) {
        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.41.132:9090<http://192.168.41.132:9090>");
        
configProperties.put("group.id<http://group.id>","some-different-group-3");
        configProperties.put("enable.auto<http://enable.auto>.commit", "true");
        
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_committed");


        KafkaConsumer<Integer, DataObject> consumer = new 
KafkaConsumer(configProperties,new IntegerDeserializer(),new 
JsonDeserializer(DataObject.class));

        consumer.subscribe(Arrays.asList(topic));

        LOGGER.info<http://LOGGER.info>("*************** Starting Consumer 
*************");

        while (true) {
            ConsumerRecords<Integer, DataObject> records = consumer.poll(1000);
            records.forEach(record -> {
                System.out.printf("offset = %d\n", record.offset());
                System.out.println("Key = " + record.key().toString() + 
"\nMessage = " + record.value().toString());
            });
        }

    }
}

</CODE consumer


My Consumer Console logs.

<LOG consumer>
0    [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - 
ConsumerConfig values:
auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
auto.offset.reset = earliest
bootstrap.servers = [192.168.41.132:9090<http://192.168.41.132:9090>]
check.crcs = true
client.id<http://client.id> =
connections.max.idle.ms<http://connections.max.idle.ms> = 540000
enable.auto<http://enable.auto>.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
fetch.min.bytes = 1
group.id<http://group.id> = some-different-group-3
heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
interceptor.classes = null
internal.leave.group<http://internal.leave.group>.on.close = true
isolation.level = read_committed
key.deserializer = class 
org.apache.kafka.common.serialization.IntegerDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms<http://max.poll.interval.ms> = 300000
max.poll.records = 500
metadata.max.age.ms<http://metadata.max.age.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 305000
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms<http://session.timeout.ms> = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class 
com.example.transaction.producer.utils.serde.JsonDeserializer

2    [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Starting 
the Kafka consumer
139  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata 
version 1 to Cluster(id = null, nodes = 
[192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)], 
partitions = [])
164  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name fetch-throttle-time
444  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name connections-closed:
446  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name connections-created:
446  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name bytes-sent-received:
446  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name bytes-sent:
447  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name bytes-received:
447  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name select-time:
448  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name io-time:
478  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name heartbeat-latency
479  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name join-latency
479  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name sync-latency
482  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name commit-latency
488  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name bytes-fetched
488  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name records-fetched
490  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name fetch-latency
490  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name records-lag
495  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version 
: 0.11.0.0
495  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId 
: cb8625948210849f
497  [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Kafka 
consumer created
497  [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Subscribed 
to topic(s): topic-4
498  [main] INFO  com.example.transaction.producer.consumer.Consumer  - 
*************** Starting Consumer *************
498  [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
GroupCoordinator request for group some-different-group-3 to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)
509  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating 
connection to node -1 at 192.168.41.132:9090<http://192.168.41.132:9090>.
523  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name node--1.bytes-sent
525  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name node--1.bytes-received
526  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name node--1.latency
527  [main] DEBUG 
org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector
  - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 
to node -1
531  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Completed 
connection to node -1.  Fetching API versions.
531  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API 
versions fetch from node -1.
661  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API 
versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 
[usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], 
LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], 
UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], 
OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], 
FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], 
Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], 
SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], 
ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], 
ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], 
DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], 
InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], 
AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], 
EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], 
TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], 
CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], 
DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
662  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata 
request (type=MetadataRequest, topics=topic-4) to node -1
672  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata 
version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = 
[192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)], 
partitions = [Partition(topic = topic-4, partition = 0, leader = 1001, replicas 
= [1001], isr = [1001])])
675  [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
GroupCoordinator response ClientResponse(receivedTimeMs=1509541833582, 
latencyMs=170, disconnected=false, 
requestHeader={api_key=10,api_version=1,correlation_id=0,client_id=consumer-1}, 
responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=NONE, node=192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 
rack: null))) for group some-different-group-3
676  [main] INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 
rack: null) for group some-different-group-3.
676  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating 
connection to node 2147482646 at 
192.168.41.132:9090<http://192.168.41.132:9090>.
679  [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
synchronous auto-commit of offsets {} for group some-different-group-3
679  [main] INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Revoking 
previously assigned partitions [] for group some-different-group-3
679  [main] INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - (Re-)joining 
group some-different-group-3
682  [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
JoinGroup ((type: JoinGroupRequest, groupId=some-different-group-3, 
sessionTimeout=10000, rebalanceTimeout=300000, memberId=, 
protocolType=consumer, 
groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@133e16fd))
 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 
rack: null)
683  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name node-2147482646.bytes-sent
684  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name node-2147482646.bytes-received
684  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name node-2147482646.latency
684  [main] DEBUG 
org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector
  - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 
to node 2147482646
685  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Completed 
connection to node 2147482646.  Fetching API versions.
685  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API 
versions fetch from node 2147482646.
692  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API 
versions for node 2147482646: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 
[usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], 
LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], 
UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], 
OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], 
FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], 
Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], 
SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], 
ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], 
ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], 
DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], 
InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], 
AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], 
EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], 
TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], 
CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], 
DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
695  [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Heartbeat 
thread for group some-different-group-3 started
717  [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
successful JoinGroup response for group some-different-group-3: 
org.apache.kafka.common.requests.JoinGroupResponse@140e5a13
718  [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Performing 
assignment for group some-different-group-3 using strategy range with 
subscriptions 
{consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806=Subscription(topics=[topic-4])}
719  [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Finished 
assignment for group some-different-group-3: 
{consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806=Assignment(partitions=[topic-4-0])}
721  [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
leader SyncGroup for group some-different-group-3 to coordinator 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null): 
(type=SyncGroupRequest, groupId=some-different-group-3, generationId=6, 
memberId=consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806, 
groupAssignment=consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806)
787  [main] INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Successfully 
joined group some-different-group-3 with generation 6
787  [main] INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Setting 
newly assigned partitions [topic-4-0] for group some-different-group-3
803  [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
some-different-group-3 fetching committed offsets for partitions: [topic-4-0]
819  [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Resetting offset for partition topic-4-0 to the committed offset 0
819  [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
819  [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating 
connection to node 1001 at 192.168.41.132:9090<http://192.168.41.132:9090>.
819  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name node-1001.bytes-sent
819  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name node-1001.bytes-received
819  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name node-1001.latency
819  [main] DEBUG 
org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector
  - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 
to node 1001
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Completed 
connection to node 1001.  Fetching API versions.
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API 
versions fetch from node 1001.
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API 
versions for node 1001: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 
[usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], 
LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], 
UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], 
OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], 
FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], 
Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], 
SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], 
ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], 
ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], 
DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], 
InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], 
AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], 
EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], 
TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], 
CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], 
DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name topic.topic-4.bytes-fetched
1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name topic.topic-4.records-fetched
1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with 
name topic-4-0.records-lag
1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2906 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
2907 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2907 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3792 [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
Heartbeat request for group some-different-group-3 to coordinator 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null)
3792 [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
successful Heartbeat response for group some-different-group-3
3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
5797 [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
asynchronous auto-commit of offsets {topic-4-0=OffsetAndMetadata{offset=0, 
metadata=''}} for group some-different-group-3
5819 [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
some-different-group-3 committed offset 0 for partition topic-4-0
5819 [main] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Completed 
auto-commit of offsets {topic-4-0=OffsetAndMetadata{offset=0, metadata=''}} for 
group some-different-group-3
6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6800 [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
Heartbeat request for group some-different-group-3 to coordinator 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null)
6800 [main] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
successful Heartbeat response for group some-different-group-3
7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch 
READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data 
(error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added 
READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - 
Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 
192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
</LOGS consumer>



Reply via email to