Hi Max, Is it possible for you to try Kafka Low Level Consumer which I have written which is also part of Spark-Packages . This Consumer also a Reliable Consumer having no data loss on Receiver failure. I have tested this with Spark 1.2 with spark.streaming.receiver.writeAheadLog.enable as "true", and this consumer is pulling messages from Kafka and store in WAL .
I have tested this consumer with Spark 1.2 with WAL feature enabled with a large Kafka backlog ( around 6 million messages), and it pulls without any issue. With WAL feature enabled, the throughput will be impacted . You can find this consumer here : https://github.com/dibbhatt/kafka-spark-consumer Here is the reference of it in spark-package : http://spark-packages.org/package/5 If you find some issue configuring this, you can reach me. Regards, Dibyendu On Tue, Jan 13, 2015 at 1:40 AM, Max Xu <max...@twosigma.com> wrote: > Hi all, > > > > I am running a Spark streaming application with ReliableKafkaReceiver > (Spark 1.2.0). Constantly I was getting the following exception: > > > > 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing > thread > > java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] > > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > > at > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > > at scala.concurrent.Await$.result(package.scala:107) > > at > org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176) > > at > org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160) > > at > org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126) > > at > org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124) > > at org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org > $apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207) > > at > org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275) > > at > org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181) > > at org.apache.spark.streaming.receiver.BlockGenerator.org > $apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154) > > at > org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86) > > > > After the exception, ReliableKafkaReceiver stayed in ACTIVE status but > stopped receiving data from Kafka. The Kafka message handler thread is in > BLOCKED state: > > > > Thread 92: KafkaMessageHandler-0 (BLOCKED) > > > org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123) > > org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org > $apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185) > > > org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247) > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > > java.util.concurrent.FutureTask.run(FutureTask.java:262) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > java.lang.Thread.run(Thread.java:745) > > > > Sometimes when the exception was thrown, I also see warning messages like > this: > > 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took > 30533ms (threshold=30000ms); ack: seqno: 113 status: SUCCESS status: > SUCCESS downstreamAckTimeNanos: 30524893062, targets: > [172.20.xxx.xxx:50010, 172.20.xxx.xxx:50010] > > 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms > (threshold=30000ms) > > > > In the past, I never have such problem with KafkaReceiver. What causes > this exception? How can I solve this problem? > > > > Thanks in advance, > > Max >