Hi, I am trying to integrate https://github.com/relango/kafka/tree/kafka_security_0.8.2 with Spark Streaming using the SimpleConsumer. I know that the SSL patch is on its way but need to set up a prototype hence went ahead with Raja's version.
So when I run my spark job to retrieve data from 1 topic with just 1 partition I get a OutOfMemoryError. Here is the stack trace: Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:91) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80) at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.streaming.kafka.KafkaCluster.org $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:310) Need help from experts to resolve this. Thanks, Sourabh