Hi All, I am trying to make a hello world example for Transactional Producer and trying to consume. I am doing all this in plain java.
I can produce but consumer is not consuming message. I searched over other places and I found some people have same problem. Right now, I am using single broker. I tried same with 3 brokers also and it was not working at that time also. I don’t know what I am missing and where… :p in Consumer I am missing something or in producer. I have attached Producer and Consumer codes and console logs with my broker logs Thanks, Abhishek My Broker logs after producing messages <LOGS broker> [2017-11-01 18:45:55,000] INFO Updated PartitionLeaderEpoch. New: {epoch:4, offset:3}, Current: {epoch:3, offset0} for Partition: __transaction_state-2. Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache) [2017-11-01 18:46:03,482] INFO [Transaction Coordinator 1001]: Initialized transactionalId TXN_ID:0.5031925219291776-156417066 with producerId 4001 and producer epoch 0 on partition __transaction_state-2 (kafka.coordinator.transaction.TransactionCoordinator) </LOGS broker> My producer code is <CODE producer> import com.example.transaction.producer.utils.DataObject; import com.example.transaction.producer.utils.serde.JsonSerializer; import kafka.common.AuthorizationException; import kafka.common.KafkaException; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import java.util.*; public class SampleProducer { public static String topic = "topic-4"; public static void main(String[] args) { Properties configProperties = new Properties(); //configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "some-client-id"); configProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TXN_ID:" + new Random().nextDouble() + new Random().nextInt()); configProperties.put("acks", "all"); configProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); configProperties.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); configProperties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); configProperties.put("value.serializer", JsonSerializer.class); configProperties.put("bootstrap.servers", "192.168.41.132:9090<http://192.168.41.132:9090>"); KafkaProducer<Integer, DataObject>producer = new KafkaProducer<>(configProperties); System.out.println("Init Transaction"); producer.initTransactions(); try { System.out.println("transaction initialised going to begin transaction"); producer.beginTransaction(); System.out.println("Transaction started"); ProducerRecord rec = new ProducerRecord(topic, 5, new DataObject(5, "Hello, World!")); RecordMetadata metadata = (RecordMetadata) producer.send(rec).get(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); metadata = (RecordMetadata) producer.send(rec).get(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); producer.commitTransaction(); System.out.println("Transaction Committed"); }catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){ // We can't recover from these exceptions, so our only option is to close the producer and exit. System.out.println("Connection closed but commit failed. We can't recover"); producer.close(); }catch(KafkaException e) { // For all other exceptions, just abort the transaction and try again. System.out.println("Abort Transaction"); producer.abortTransaction(); }catch (Exception x){} producer.close(); System.out.println("Closed"); } } </CODE proucer> These are my producer console logs <LOGS producer> 0 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = all batch.size = 16384 bootstrap.servers = [192.168.41.132:9090<http://192.168.41.132:9090>] buffer.memory = 33554432 client.id<http://client.id> = compression.type = none connections.max.idle.ms<http://connections.max.idle.ms> = 540000 enable.idempotence = true interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer linger.ms<http://linger.ms> = 0 max.block.ms<http://max.block.ms> = 60000 max.in<http://max.in>.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms<http://metadata.max.age.ms> = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000 reconnect.backoff.ms<http://reconnect.backoff.ms> = 50 request.timeout.ms<http://request.timeout.ms> = 30000 retries = 2147483647 retry.backoff.ms<http://retry.backoff.ms> = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms<http://transaction.timeout.ms> = 60000 transactional.id<http://transactional.id> = TXN_ID:0.5031925219291776-156417066 value.serializer = class com.example.transaction.producer.utils.serde.JsonSerializer 261 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Instantiated a transactional producer. 265 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Overriding the default max.in<http://max.in>.flight.requests.per.connection to 1 since idempontence is enabled. 274 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time 281 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records 284 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)], partitions = []) 297 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time 564 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed: 564 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created: 565 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received: 565 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent: 565 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received: 566 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time: 567 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time: 573 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-size 574 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name compression-rate 574 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name queue-time 575 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name request-time 575 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-per-request 577 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-retries 577 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name errors 577 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-size-max 579 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-split-rate 582 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Starting Kafka producer I/O thread. 585 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.0 585 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : cb8625948210849f 586 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - Kafka producer started Init Transaction 588 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state UNINITIALIZED to INITIALIZING 588 [main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] ProducerId set to -1 with epoch -1 594 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000) 598 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=TXN_ID:0.5031925219291776-156417066, coordinatorType=TRANSACTION) 598 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000) 712 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at 192.168.41.132:9090<http://192.168.41.132:9090>. 712 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 712 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 712 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 712 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 712 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1. Fetching API versions. 712 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node -1. 897 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0]) 898 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=TXN_ID:0.5031925219291776-156417066, coordinatorType=TRANSACTION) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null) 901 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 1001 at 192.168.41.132:9090<http://192.168.41.132:9090>. 901 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1001.bytes-sent 902 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1001.bytes-received 902 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1001.latency 903 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1001 903 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 1001. Fetching API versions. 903 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node 1001. 905 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Recorded API versions for node 1001: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0]) 1009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 9491 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] ProducerId set to 4001 with epoch 0 9491 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state INITIALIZING to READY transaction initialised going to begin transaction 9491 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state READY to IN_TRANSACTION Transaction started 9491 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request (type=MetadataRequest, topics=topic-4) to node -1 9491 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)], partitions = [Partition(topic = topic-4, partition = 0, leader = 1001, replicas = [1001], isr = [1001])]) 9523 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Begin adding new partition topic-4-0 to transaction 9523 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, partitions=[topic-4-0]) 9523 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, partitions=[topic-4-0]) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Successfully added partitions [topic-4-0] to transaction 9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator - Assigning sequence number 0 from producer (producerId=4001, epoch=0) to dequeued batch from partition topic-4-0 bound for 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null). 9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-4.records-per-batch 9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-4.bytes 9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-4.compression-rate 9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-4.record-retries 9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-4.record-errors 9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Incremented sequence number for topic-partition topic-4-0 to 1 The offset of the record we just sent is: 5 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator - Assigning sequence number 1 from producer (producerId=4001, epoch=0) to dequeued batch from partition topic-4-0 bound for 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null). 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Incremented sequence number for topic-partition topic-4-0 to 2 The offset of the record we just sent is: 6 9554 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION 9554 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, result=COMMIT) 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, result=COMMIT) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state COMMITTING_TRANSACTION to READY Transaction Committed 9554 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records. 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed: 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created: 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received: 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent: 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received: 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time: 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time: 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-1001.bytes-sent 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-1001.bytes-received 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-1001.latency 9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Shutdown of Kafka producer I/O thread has completed. 9554 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - The Kafka producer has closed. Closed </LOGS producer console> My consumer side code. <CODE consumer> package com.example.transaction.producer.consumer; import com.example.transaction.producer.utils.DataObject; import com.example.transaction.producer.utils.serde.JsonDeserializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Properties<http://java.util.Properties>; public class Consumer { private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class); public static String topic = "topic-4"; public static void main(String[] args) { Properties configProperties = new Properties(); configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.41.132:9090<http://192.168.41.132:9090>"); configProperties.put("group.id<http://group.id>","some-different-group-3"); configProperties.put("enable.auto<http://enable.auto>.commit", "true"); configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); KafkaConsumer<Integer, DataObject> consumer = new KafkaConsumer(configProperties,new IntegerDeserializer(),new JsonDeserializer(DataObject.class)); consumer.subscribe(Arrays.asList(topic)); LOGGER.info<http://LOGGER.info>("*************** Starting Consumer *************"); while (true) { ConsumerRecords<Integer, DataObject> records = consumer.poll(1000); records.forEach(record -> { System.out.printf("offset = %d\n", record.offset()); System.out.println("Key = " + record.key().toString() + "\nMessage = " + record.value().toString()); }); } } } </CODE consumer My Consumer Console logs. <LOG consumer> 0 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000 auto.offset.reset = earliest bootstrap.servers = [192.168.41.132:9090<http://192.168.41.132:9090>] check.crcs = true client.id<http://client.id> = connections.max.idle.ms<http://connections.max.idle.ms> = 540000 enable.auto<http://enable.auto>.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms<http://fetch.max.wait.ms> = 500 fetch.min.bytes = 1 group.id<http://group.id> = some-different-group-3 heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000 interceptor.classes = null internal.leave.group<http://internal.leave.group>.on.close = true isolation.level = read_committed key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms<http://max.poll.interval.ms> = 300000 max.poll.records = 500 metadata.max.age.ms<http://metadata.max.age.ms> = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000 reconnect.backoff.ms<http://reconnect.backoff.ms> = 50 request.timeout.ms<http://request.timeout.ms> = 305000 retry.backoff.ms<http://retry.backoff.ms> = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms<http://session.timeout.ms> = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class com.example.transaction.producer.utils.serde.JsonDeserializer 2 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Starting the Kafka consumer 139 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)], partitions = []) 164 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time 444 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed: 446 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created: 446 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received: 446 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent: 447 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received: 447 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time: 448 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time: 478 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency 479 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name join-latency 479 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name sync-latency 482 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name commit-latency 488 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched 488 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-fetched 490 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-latency 490 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-lag 495 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.0 495 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : cb8625948210849f 497 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created 497 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): topic-4 498 [main] INFO com.example.transaction.producer.consumer.Consumer - *************** Starting Consumer ************* 498 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending GroupCoordinator request for group some-different-group-3 to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null) 509 [main] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at 192.168.41.132:9090<http://192.168.41.132:9090>. 523 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 525 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 526 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 527 [main] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 531 [main] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1. Fetching API versions. 531 [main] DEBUG org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node -1. 661 [main] DEBUG org.apache.kafka.clients.NetworkClient - Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0]) 662 [main] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request (type=MetadataRequest, topics=topic-4) to node -1 672 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)], partitions = [Partition(topic = topic-4, partition = 0, leader = 1001, replicas = [1001], isr = [1001])]) 675 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1509541833582, latencyMs=170, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=0,client_id=consumer-1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null))) for group some-different-group-3 676 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null) for group some-different-group-3. 676 [main] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147482646 at 192.168.41.132:9090<http://192.168.41.132:9090>. 679 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending synchronous auto-commit of offsets {} for group some-different-group-3 679 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group some-different-group-3 679 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group some-different-group-3 682 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ((type: JoinGroupRequest, groupId=some-different-group-3, sessionTimeout=10000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@133e16fd)) to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null) 683 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147482646.bytes-sent 684 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147482646.bytes-received 684 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147482646.latency 684 [main] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147482646 685 [main] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 2147482646. Fetching API versions. 685 [main] DEBUG org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node 2147482646. 692 [main] DEBUG org.apache.kafka.clients.NetworkClient - Recorded API versions for node 2147482646: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0]) 695 [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Heartbeat thread for group some-different-group-3 started 717 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received successful JoinGroup response for group some-different-group-3: org.apache.kafka.common.requests.JoinGroupResponse@140e5a13 718 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing assignment for group some-different-group-3 using strategy range with subscriptions {consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806=Subscription(topics=[topic-4])} 719 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Finished assignment for group some-different-group-3: {consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806=Assignment(partitions=[topic-4-0])} 721 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending leader SyncGroup for group some-different-group-3 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null): (type=SyncGroupRequest, groupId=some-different-group-3, generationId=6, memberId=consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806, groupAssignment=consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806) 787 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group some-different-group-3 with generation 6 787 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [topic-4-0] for group some-different-group-3 803 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group some-different-group-3 fetching committed offsets for partitions: [topic-4-0] 819 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition topic-4-0 to the committed offset 0 819 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 819 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 819 [main] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 1001 at 192.168.41.132:9090<http://192.168.41.132:9090>. 819 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1001.bytes-sent 819 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1001.bytes-received 819 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1001.latency 819 [main] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1001 819 [main] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 1001. Fetching API versions. 819 [main] DEBUG org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node 1001. 819 [main] DEBUG org.apache.kafka.clients.NetworkClient - Recorded API versions for node 1001: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0]) 1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-4.bytes-fetched 1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-4.records-fetched 1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic-4-0.records-lag 1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 2906 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 2907 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 2907 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 3792 [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending Heartbeat request for group some-different-group-3 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null) 3792 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received successful Heartbeat response for group some-different-group-3 3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 5797 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending asynchronous auto-commit of offsets {topic-4-0=OffsetAndMetadata{offset=0, metadata=''}} for group some-different-group-3 5819 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group some-different-group-3 committed offset 0 for partition topic-4-0 5819 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Completed auto-commit of offsets {topic-4-0=OffsetAndMetadata{offset=0, metadata=''}} for group some-different-group-3 6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 6800 [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending Heartbeat request for group some-different-group-3 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null) 6800 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received successful Heartbeat response for group some-different-group-3 7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) 9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null) </LOGS consumer>