I am using SimpleConsumer and checked out the code from your repo 1.5 weeks back, did you push anything post that?
Thanks, Sourabh On Thursday, August 6, 2015, Rajasekar Elango <rela...@salesforce.com> wrote: > Hi Sourabh, > > We have seen this error, if kafka broker was running with SSL on Consumer > is trying to consumer in plaintext mode. Are you using high level consumer > or SimpleConsumer..? If you using using SimpleConsumer, pull latest code > from my repo < > https://github.com/relango/kafka/commits/kafka_security_0.8.2> > and > pass secure parameters to SimpleConsumer constructor. > > Thanks, > Raja. > > On Thu, Aug 6, 2015 at 9:01 PM, Sourabh Chandak <sourabh3...@gmail.com > <javascript:;>> > wrote: > > > Hi, > > > > I am trying to integrate > > https://github.com/relango/kafka/tree/kafka_security_0.8.2 with Spark > > Streaming using the SimpleConsumer. I know that the SSL patch is on its > way > > but need to set up a prototype hence went ahead with Raja's version. > > > > So when I run my spark job to retrieve data from 1 topic with just 1 > > partition I get a OutOfMemoryError. > > > > Here is the stack trace: > > > > Exception in thread "main" java.lang.OutOfMemoryError: Java heap space > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > > at > > > > > kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) > > at > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) > > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:91) > > at > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80) > > at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103) > > at > > > > > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126) > > at > > > > > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125) > > at > > > > > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346) > > at > > > > > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342) > > at > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) > > at org.apache.spark.streaming.kafka.KafkaCluster.org > > > > > $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342) > > at > > > > > org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125) > > at > > > > > org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) > > at > > > > > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:310) > > > > Need help from experts to resolve this. > > > > Thanks, > > Sourabh > > > > > > -- > Thanks, > Raja. >