I have one more doubt. The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, *then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space*. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.
*so if my consumer is down more than this retention period, it will not receive those messages forever. how can i set my retention time indefinite*. On Tue, Apr 15, 2014 at 12:34 PM, ankit tyagi <ankittyagi.mn...@gmail.com>wrote: > I have increased the partition for parallelism but my concern is , if > message are present in partition then why consumer thread are blocked at > below condition. > > *KafkaConsumer-24" prio=10 tid=0x00007f6da5726800 nid=0x3c8a waiting on > condition [0x00007f6d4f8f7000] java.lang.Thread.State: WAITING (parking) at > sun.misc.Unsafe.park(Native Method) - parking to wait for > <0x00000006c11ce780> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)* > > > On Tue, Apr 15, 2014 at 11:53 AM, Arjun <ar...@socialtwist.com> wrote: > >> Hi, >> >> Can you please check weather this is the situation >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why? >> >> Arjun Kota >> >> >> On Tuesday 15 April 2014 11:49 AM, ankit tyagi wrote: >> >> Hi. >> >> currently we are using *kafka_2.8.0-0.8.0-beta1 and *and high level >> consumer group to consume messages. Topic has been created with 3 replica >> and 100 partition so that max 100 threads can consume messages >> simultaneously, but i am seeing that mostly threads are in waiting state >> and lag is getting increased. >> >> Below is output of *kafka-run-class.sh kafka.tools.ConsumerOffsetChecker* >> >> Group Topic Pid Offset logSize >> Lag Owner >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 0 1030308 1081657 51349 >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-0 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 1 348475 460598 112123 >> >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-0 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 2 296231 461154 164923 >> >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-1 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 3 1040848 1094389 53541 >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-1 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 4 398706 460787 62081 >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-10 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 5 330990 461085 130095 >> >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-10 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 6 1046217 1093952 47735 >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-11 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 7 348592 461147 112555 >> >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-11 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 8 341237 460094 118857 >> >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-12 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 9 1046635 1094724 48089 >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-12 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 10 407063 461124 54061 >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-13 >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.comkafka_topic_coms_esb_prod_coms >> 11 329498 460666 131168 >> >> >> kafka.coms.consumer.kafka_topic_coms_esb_prod_coms.coms-timemachine.coms.coms04.snapdeal.com_coms04.snapdeal.com-1397482483587-832c7c25-13 >> >> Below is out put of *consumer thread group's thread_dump* >> >> *KafkaConsumer-24" prio=10 tid=0x00007f6da5726800 nid=0x3c8a waiting on >> condition [0x00007f6d4f8f7000] java.lang.Thread.State: WAITING (parking) at >> sun.misc.Unsafe.park(Native Method) - parking to wait for >> <0x00000006c11ce780> (a >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) >> at >> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) >> at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63) at >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61) at >> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53) at >> com.snapdeal.coms.kafka.KafkaEventListenerContainer$KafkaConsumer.safeRun(KafkaEventListenerContainer.java:141) >> at >> com.snapdeal.coms.kafka.KafkaEventListenerContainer$KafkaConsumer.run(KafkaEventListenerContainer.java:175) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at >> java.util.concurrent.FutureTask.run(FutureTask.java:166) at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> at java.lang.Thread.run(Thread.java:722) "KafkaConsumer-23" prio=10 >> tid=0x00007f6da5724000 nid=0x3c89 waiting on condition [0x00007f6d4f9f8000] >> java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native >> Method) - parking to wait for <0x00000006c11cde90> (a >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) >> at >> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) >> at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63) at >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61) at >> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53) at >> com.snapdeal.coms.kafka.KafkaEventListenerContainer$KafkaConsumer.safeRun(KafkaEventListenerContainer.java:141) >> at >> com.snapdeal.coms.kafka.KafkaEventListenerContainer$KafkaConsumer.run(KafkaEventListenerContainer.java:175) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at >> java.util.concurrent.FutureTask.run(FutureTask.java:166) at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)* >> >> *I am not sure why my consumer thread's are waiting on this condition if >> messages are available in kafka partition.* >> >> >> >> >