I think this is a known issue in Existing KafkaUtils. Even we had this issue. The problem is in Existing KafkaUtil there is no way to control the message flow.
You can refer to another mail thread on Low Level Kafka Consumer which I have written to solve this issue along with many other.. Dib On Aug 28, 2014 6:26 AM, "Tim Smith" <secs...@gmail.com> wrote: > Hi, > > I have Spark (1.0.0 on CDH5) running with Kafka 0.8.1.1. > > I have a streaming jobs that reads from a kafka topic and writes > output to another kafka topic. The job starts fine but after a while > the input stream stops getting any data. I think these messages show > no incoming data on the stream: > 14/08/28 00:42:15 INFO ReceiverTracker: Stream 0 received 0 blocks > > I run the job as: > spark-submit --class logStreamNormalizer --master yarn > log-stream-normalizer_2.10-1.0.jar --jars > > spark-streaming-kafka_2.10-1.0.2.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar > --executor-memory 6G --spark.cleaner.ttl 60 --executor-cores 4 > > As soon as I start the job, I see an error like: > > 14/08/28 00:50:59 INFO BlockManagerInfo: Added input-0-1409187056800 > in memory on node6-acme.com:39418 (size: 83.3 MB, free: 3.1 GB) > Exception in thread "pool-1-thread-7" java.lang.OutOfMemoryError: Java > heap space > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) > at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:85) > at > org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176) > at > org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63) > at > org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109) > at > org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:42) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at > org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) > at org.apache.spark.network.ConnectionManager.org > $apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662) > at > org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > But not sure if that is the cause because even after that OOM message, > I see data coming in: > 14/08/28 00:51:00 INFO ReceiverTracker: Stream 0 received 6 blocks > > Appreciate any pointers or suggestions to troubleshoot the issue. > > Thanks > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >