Hi Max,

Is it possible for you to try Kafka Low Level Consumer which I have written
which is also part of Spark-Packages . This Consumer also a Reliable
Consumer having no data loss on Receiver failure. I have tested this with
Spark 1.2 with  spark.streaming.receiver.writeAheadLog.enable as "true",
and this consumer is pulling messages from Kafka and store in WAL .

I have tested this consumer with Spark 1.2 with WAL feature enabled with a
large Kafka backlog ( around 6 million messages), and it pulls without any
issue.

With WAL feature enabled, the throughput will be impacted .

You can find this consumer here :
https://github.com/dibbhatt/kafka-spark-consumer

Here is the reference of it in spark-package :
http://spark-packages.org/package/5

If you find some issue configuring this, you can reach me.

Regards,
Dibyendu





On Tue, Jan 13, 2015 at 1:40 AM, Max Xu <max...@twosigma.com> wrote:

>  Hi all,
>
>
>
> I am running a Spark streaming application with ReliableKafkaReceiver
> (Spark 1.2.0). Constantly I was getting the following exception:
>
>
>
> 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing
> thread
>
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>
>         at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
>         at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
>         at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
>         at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
>         at scala.concurrent.Await$.result(package.scala:107)
>
>         at
> org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
>
>         at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
>
>         at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
>
>         at
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
>
>         at org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org
> $apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
>
>         at
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
>
>         at
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
>
>         at org.apache.spark.streaming.receiver.BlockGenerator.org
> $apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
>
>         at
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)
>
>
>
> After the exception, ReliableKafkaReceiver stayed in ACTIVE status but
> stopped receiving data from Kafka. The Kafka message handler thread is in
> BLOCKED state:
>
>
>
> Thread 92: KafkaMessageHandler-0 (BLOCKED)
>
>
> org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123)
>
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org
> $apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185)
>
>
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247)
>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>
> java.util.concurrent.FutureTask.run(FutureTask.java:262)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> java.lang.Thread.run(Thread.java:745)
>
>
>
> Sometimes when the exception was thrown, I also see warning messages like
> this:
>
> 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took
> 30533ms (threshold=30000ms); ack: seqno: 113 status: SUCCESS status:
> SUCCESS downstreamAckTimeNanos: 30524893062, targets:
> [172.20.xxx.xxx:50010, 172.20.xxx.xxx:50010]
>
> 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms
> (threshold=30000ms)
>
>
>
> In the past, I never have such problem with KafkaReceiver. What causes
> this exception? How can I solve this problem?
>
>
>
> Thanks in advance,
>
> Max
>

Reply via email to