Hi,

I am trying to integrate
https://github.com/relango/kafka/tree/kafka_security_0.8.2 with Spark
Streaming using the SimpleConsumer. I know that the SSL patch is on its way
but need to set up a prototype hence went ahead with Raja's version.

So when I run my spark job to retrieve data from 1 topic with just 1
partition I get a OutOfMemoryError.

Here is the stack trace:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
    at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:91)
    at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
    at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
    at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at org.apache.spark.streaming.kafka.KafkaCluster.org
$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
    at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
    at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
    at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:310)

Need help from experts to resolve this.

Thanks,
Sourabh

Reply via email to