We are just evaluating Kafka starting with the latest v0.9.0.1. A very basic use case is to start initial consumers to begin from the earliest entry only for the first time, the second restarts will start from an offset wherever they left off. From what I read, you can set auto.offset.reset = earliest to do this, but it does not seem to work.
consumer.subscribe(Collections.singletonList(KafkaProperties.topic)); ConsumerRecords<Integer, String> records = consumer.poll(1000); for (ConsumerRecord<Integer, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } LOGS: [he.kafka.clients.consumer.ConsumerConfig] - ConsumerConfig values: request.timeout.ms = 40000 check.crcs = true retry.backoff.ms = 100 ssl.truststore.password = null ssl.keymanager.algorithm = SunX509 receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.key.password = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.provider = null sasl.kerberos.service.name = null session.timeout.ms = 30000 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [10.30.26.98:32774] client.id = consumer-7 fetch.max.wait.ms = 500 fetch.min.bytes = 1 key.deserializer = class org.apache.kafka.common.serialization. IntegerDeserializer sasl.kerberos.kinit.cmd = /usr/bin/kinit auto.offset.reset = earliest <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< value.deserializer = class org.apache.kafka.common.serialization. StringDeserializer ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] partition.assignment.strategy = [org.apache.kafka.clients.consumer. RangeAssignor] ssl.endpoint.identification.algorithm = null max.partition.fetch.bytes = 1048576 ssl.keystore.location = null ssl.truststore.location = null ssl.keystore.password = null metrics.sample.window.ms = 30000 metadata.max.age.ms = 300000 security.protocol = PLAINTEXT auto.commit.interval.ms = 1000 ssl.protocol = TLS sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.trustmanager.algorithm = PKIX group.id = DemoConsumer7 enable.auto.commit = true metric.reporters = [] ssl.truststore.type = JKS send.buffer.bytes = 131072 reconnect.backoff.ms = 50 metrics.num.samples = 2 ssl.keystore.type = JKS heartbeat.interval.ms = 3000 - 2016-03-21 17:11:34.786 DEBUG main [che.kafka.clients.consumer. KafkaConsumer] - Starting the Kafka consumer - 2016-03-21 17:11:39.318 DEBUG main [org.apache.kafka.clients. Metadata ] - Updated cluster metadata version 1 to Cluster(nodes = [ Node(-1, 10.30.26.98, 32774)], partitions = []) - 2016-03-21 17:11:39.406 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name connections-closed:client-id-consumer-7 - 2016-03-21 17:11:39.414 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name connections-created:client-id-consumer- 7 - 2016-03-21 17:11:39.415 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name bytes-sent-received:client-id-consumer- 7 - 2016-03-21 17:11:39.415 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name bytes-sent:client-id-consumer-7 - 2016-03-21 17:11:39.418 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name bytes-received:client-id-consumer-7 - 2016-03-21 17:11:39.418 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name select-time:client-id-consumer-7 - 2016-03-21 17:11:39.419 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name io-time:client-id-consumer-7 - 2016-03-21 17:11:39.448 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name heartbeat-latency - 2016-03-21 17:11:39.448 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name join-latency - 2016-03-21 17:11:39.448 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name sync-latency - 2016-03-21 17:11:39.450 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name commit-latency - 2016-03-21 17:11:39.456 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name bytes-fetched - 2016-03-21 17:11:39.456 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name records-fetched - 2016-03-21 17:11:39.456 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name fetch-latency - 2016-03-21 17:11:39.457 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name records-lag - 2016-03-21 17:11:39.457 DEBUG main [org.apache.kafka.common.metrics .Metrics ] - Added sensor with name fetch-throttle-time - 2016-03-21 17:11:39.458 INFO main [.apache.kafka.common.utils. AppInfoParser] - Kafka version : 0.9.0.1 - 2016-03-21 17:11:39.458 INFO main [.apache.kafka.common.utils. AppInfoParser] - Kafka commitId : 23c69d62a0cabf06 - 2016-03-21 17:11:39.459 DEBUG main [che.kafka.clients.consumer. KafkaConsumer] - Kafka consumer created - 2016-03-21 17:11:39.460 INFO KafkaConsumerExample [s.nextgen.kafka. examples.SimpleConsumer6] - [KafkaConsumerExample], Starting - 2016-03-21 17:11:39.460 INFO KafkaConsumerExample [s.nextgen.kafka. examples.SimpleConsumer6] - [KafkaConsumerExample], Starting - 2016-03-21 17:11:39.461 DEBUG KafkaConsumerExample [che.kafka.clients. consumer.KafkaConsumer] - Subscribed to topic(s): test-topic - 2016-03-21 17:11:39.461 DEBUG KafkaConsumerExample [s.consumer.internals. AbstractCoordinator] - Issuing group metadata request to broker -1 - 2016-03-21 17:11:39.473 DEBUG KafkaConsumerExample [org.apache.kafka. clients.NetworkClient ] - Initiating connection to node -1 at 10.30.26.98: 32774. - 2016-03-21 17:11:39.479 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name node--1.bytes-sent - 2016-03-21 17:11:39.480 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name node--1.bytes-received - 2016-03-21 17:11:39.480 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name node--1.latency - 2016-03-21 17:11:39.480 DEBUG KafkaConsumerExample [org.apache.kafka. clients.NetworkClient ] - Completed connection to node -1 - 2016-03-21 17:11:39.576 DEBUG KafkaConsumerExample [org.apache.kafka. clients.NetworkClient ] - Sending metadata request ClientRequest( expectResponse=true, callback=null, request=RequestSend(header={api_key=3, api_version=0,correlation_id=1,client_id=consumer-7}, body={topics=[test- topic]}), isInitiatedByNetworkClient, createdTimeMs=1458605499573, sendTimeMs=0) to node -1 - 2016-03-21 17:11:39.604 DEBUG KafkaConsumerExample [org.apache.kafka. clients.Metadata ] - Updated cluster metadata version 2 to Cluster(nodes = [Node(1001, 10.30.26.98, 32774)], partitions = [Partition(topic = test- topic, partition = 8, leader = 1001, replicas = [1001,], isr = [1001,], Partition(topic = test-topic, partition = 7, leader = 1001, replicas = [1001 ,], isr = [1001,], Partition(topic = test-topic, partition = 2, leader = 1001, replicas = [1001,], isr = [1001,], Partition(topic = test-topic, partition = 4, leader = 1001, replicas = [1001,], isr = [1001,], Partition(topic = test-topic, partition = 3, leader = 1001, replicas = [1001,], isr = [1001 ,], Partition(topic = test-topic, partition = 6, leader = 1001, replicas = [ 1001,], isr = [1001,], Partition(topic = test-topic, partition = 0, leader = 1001, replicas = [1001,], isr = [1001,], Partition(topic = test-topic, partition = 1, leader = 1001, replicas = [1001,], isr = [1001,], Partition(topic = test-topic, partition = 5, leader = 1001, replicas = [1001,], isr = [1001 ,], Partition(topic = test-topic, partition = 9, leader = 1001, replicas = [ 1001,], isr = [1001,]]) - 2016-03-21 17:11:39.605 DEBUG KafkaConsumerExample [s.consumer.internals. AbstractCoordinator] - Issuing group metadata request to broker 1001 - 2016-03-21 17:11:39.605 DEBUG KafkaConsumerExample [org.apache.kafka. clients.NetworkClient ] - Initiating connection to node 1001 at 10.30. 26.98:32774. - 2016-03-21 17:11:39.610 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name node-1001.bytes-sent - 2016-03-21 17:11:39.610 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name node-1001.bytes-received - 2016-03-21 17:11:39.611 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name node-1001.latency - 2016-03-21 17:11:39.611 DEBUG KafkaConsumerExample [org.apache.kafka. clients.NetworkClient ] - Completed connection to node 1001 - 2016-03-21 17:11:39.618 DEBUG KafkaConsumerExample [s.consumer.internals. AbstractCoordinator] - Group metadata response ClientResponse( receivedTimeMs=1458605499617, disconnected=false, request=ClientRequest( expectResponse=true, callback=org.apache.kafka.clients.consumer.internals. ConsumerNetworkClient$RequestFutureCompletionHandler@18399f62, request= RequestSend(header={api_key=10,api_version=0,correlation_id=2,client_id= consumer-7}, body={group_id=DemoConsumer7}), createdTimeMs=1458605499605, sendTimeMs=1458605499611), responseBody={error_code=0,coordinator={node_id= 1001,host=10.30.26.98,port=32774}}) - 2016-03-21 17:11:39.618 DEBUG KafkaConsumerExample [org.apache.kafka. clients.NetworkClient ] - Initiating connection to node 2147482646 at 10.30.26.98:32774. - 2016-03-21 17:11:39.619 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Revoking previously assigned partitions [] - 2016-03-21 17:11:39.619 DEBUG KafkaConsumerExample [s.consumer.internals. AbstractCoordinator] - (Re-)joining group DemoConsumer7 - 2016-03-21 17:11:39.621 DEBUG KafkaConsumerExample [s.consumer.internals. AbstractCoordinator] - Issuing request (JOIN_GROUP: {group_id=DemoConsumer7 ,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{ protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}) to coordinator 2147482646 - 2016-03-21 17:11:39.623 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name node-2147482646.bytes- sent - 2016-03-21 17:11:39.623 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name node-2147482646.bytes- received - 2016-03-21 17:11:39.624 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name node-2147482646.latency - 2016-03-21 17:11:39.624 DEBUG KafkaConsumerExample [org.apache.kafka. clients.NetworkClient ] - Completed connection to node 2147482646 - 2016-03-21 17:11:39.631 DEBUG KafkaConsumerExample [s.consumer.internals. AbstractCoordinator] - Joined group: {error_code=0,generation_id=1, group_protocol=range,leader_id=consumer-7-91f534c8-4443-4c2e-990f- d1de045bc5ab,member_id=consumer-7-91f534c8-4443-4c2e-990f-d1de045bc5ab, members=[{member_id=consumer-7-91f534c8-4443-4c2e-990f-d1de045bc5ab, member_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]} - 2016-03-21 17:11:39.631 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Performing range assignment for subscriptions { consumer-7-91f534c8-4443-4c2e-990f-d1de045bc5ab=org.apache.kafka.clients. consumer.internals.PartitionAssignor$Subscription@478e2443} - 2016-03-21 17:11:39.632 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Finished assignment: {consumer-7-91f534c8-4443- 4c2e-990f-d1de045bc5ab=org.apache.kafka.clients.consumer.internals. PartitionAssignor$Assignment@550c9d49} - 2016-03-21 17:11:39.633 DEBUG KafkaConsumerExample [s.consumer.internals. AbstractCoordinator] - Issuing leader SyncGroup (SYNC_GROUP: {group_id= DemoConsumer7,generation_id=1,member_id=consumer-7-91f534c8-4443-4c2e-990f- d1de045bc5ab,group_assignment=[{member_id=consumer-7-91f534c8-4443-4c2e-990f -d1de045bc5ab,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=66 cap=66 ]}]}) to coordinator 2147482646 - 2016-03-21 17:11:39.639 DEBUG KafkaConsumerExample [s.consumer.internals. AbstractCoordinator] - Received successful sync group response for group DemoConsumer7: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=66 cap=66]} - 2016-03-21 17:11:39.641 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Setting newly assigned partitions [test-topic-8, test-topic-7, test-topic-2, test-topic-4, test-topic-3, test-topic-6, test- topic-1, test-topic-0, test-topic-5, test-topic-9] - 2016-03-21 17:11:39.641 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Fetching committed offsets for partitions: [test- topic-8, test-topic-7, test-topic-2, test-topic-4, test-topic-3, test-topic- 6, test-topic-1, test-topic-0, test-topic-5, test-topic-9] - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-8 - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-7 - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-2 - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-4 - 2016-03-21 17:11:39.647 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-3 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-6 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-0 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-1 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-5 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - No committed offset for partition test-topic-9 - 2016-03-21 17:11:39.648 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-8 to earliest offset. - 2016-03-21 17:11:39.653 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243403 for partition test- topic-8 - 2016-03-21 17:11:39.653 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-7 to earliest offset. - 2016-03-21 17:11:39.656 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243467 for partition test- topic-7 - 2016-03-21 17:11:39.656 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-2 to earliest offset. - 2016-03-21 17:11:39.660 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243403 for partition test- topic-2 - 2016-03-21 17:11:39.660 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-4 to earliest offset. - 2016-03-21 17:11:39.664 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243512 for partition test- topic-4 - 2016-03-21 17:11:39.665 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-3 to earliest offset. - 2016-03-21 17:11:39.668 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243531 for partition test- topic-3 - 2016-03-21 17:11:39.668 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-6 to earliest offset. - 2016-03-21 17:11:39.672 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243538 for partition test- topic-6 - 2016-03-21 17:11:39.672 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-0 to earliest offset. - 2016-03-21 17:11:39.676 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243465 for partition test- topic-0 - 2016-03-21 17:11:39.676 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-1 to earliest offset. - 2016-03-21 17:11:39.679 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243369 for partition test- topic-1 - 2016-03-21 17:11:39.680 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-5 to earliest offset. - 2016-03-21 17:11:39.684 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243480 for partition test- topic-5 - 2016-03-21 17:11:39.684 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Resetting offset for partition test-topic-9 to earliest offset. - 2016-03-21 17:11:39.688 DEBUG KafkaConsumerExample [kafka.clients. consumer.internals.Fetcher] - Fetched offset 1243441 for partition test- topic-9 - 2016-03-21 17:11:40.213 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name topic.test-topic.bytes- fetched - 2016-03-21 17:11:40.213 DEBUG KafkaConsumerExample [org.apache.kafka. common.metrics.Metrics ] - Added sensor with name topic.test-topic.records- fetched done - 2016-03-21 17:11:40.466 DEBUG KafkaConsumerExample [che.kafka.clients. consumer.KafkaConsumer] - Subscribed to topic(s): test-topic - 2016-03-21 17:11:40.659 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243403 for partition test-topic-8 - 2016-03-21 17:11:40.660 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243467 for partition test-topic-7 - 2016-03-21 17:11:40.660 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243403 for partition test-topic-2 - 2016-03-21 17:11:40.660 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243512 for partition test-topic-4 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243531 for partition test-topic-3 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243538 for partition test-topic-6 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243465 for partition test-topic-0 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243369 for partition test-topic-1 - 2016-03-21 17:11:40.661 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243480 for partition test-topic-5 - 2016-03-21 17:11:40.662 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243441 for partition test-topic-9 done - 2016-03-21 17:11:41.467 DEBUG KafkaConsumerExample [che.kafka.clients. consumer.KafkaConsumer] - Subscribed to topic(s): test-topic - 2016-03-21 17:11:41.648 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243403 for partition test-topic-8 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243467 for partition test-topic-7 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243403 for partition test-topic-2 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243512 for partition test-topic-4 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243531 for partition test-topic-3 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243538 for partition test-topic-6 - 2016-03-21 17:11:41.649 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243465 for partition test-topic-0 - 2016-03-21 17:11:41.650 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243369 for partition test-topic-1 - 2016-03-21 17:11:41.650 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243480 for partition test-topic-5 - 2016-03-21 17:11:41.650 DEBUG KafkaConsumerExample [s.consumer.internals. ConsumerCoordinator] - Committed offset 1243441 for partition test-topic-9 done I tried poll(0), seekToBeginning but nothing works: consumer.poll(0); consumer.seekToBeginning(); consumer.poll(0); The topic always shows there are many messages: $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}' 12434609 $KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --describe --group DemoConsumer15 --bootstrap-server 10.35.25.95:32774 GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER DemoConsumer15, test-topic, 0, 1243465, 1381871, 138406, consumer-15_/10.2... DemoConsumer15, test-topic, 1, 1243369, 1381771, 138402, consumer-15_/10.2... DemoConsumer15, test-topic, 2, 1243403, 1381762, 138359, consumer-15_/10.2... DemoConsumer15, test-topic, 3, 1243531, 1382003, 138472, consumer-15_/10.2... DemoConsumer15, test-topic, 4, 1243512, 1381979, 138467, consumer-15_/10.2... DemoConsumer15, test-topic, 5, 1243480, 1381880, 138400, consumer-15_/10.2... -- Kisna