I am using SimpleConsumer and checked out the code from your repo 1.5 weeks
back, did you push anything post that?

Thanks,
Sourabh

On Thursday, August 6, 2015, Rajasekar Elango <rela...@salesforce.com>
wrote:

> Hi Sourabh,
>
> We have seen this error, if kafka broker was running with SSL on Consumer
> is trying to consumer in plaintext mode. Are you using high level consumer
> or SimpleConsumer..? If you using using SimpleConsumer, pull latest code
> from my repo <
> https://github.com/relango/kafka/commits/kafka_security_0.8.2>
> and
> pass secure parameters to SimpleConsumer constructor.
>
> Thanks,
> Raja.
>
> On Thu, Aug 6, 2015 at 9:01 PM, Sourabh Chandak <sourabh3...@gmail.com
> <javascript:;>>
> wrote:
>
> > Hi,
> >
> > I am trying to integrate
> > https://github.com/relango/kafka/tree/kafka_security_0.8.2 with Spark
> > Streaming using the SimpleConsumer. I know that the SSL patch is on its
> way
> > but need to set up a prototype hence went ahead with Raja's version.
> >
> > So when I run my spark job to retrieve data from 1 topic with just 1
> > partition I get a OutOfMemoryError.
> >
> > Here is the stack trace:
> >
> > Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
> >     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> >     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> >     at
> >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> >     at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> >     at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >     at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> >     at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:91)
> >     at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
> >     at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
> >     at
> >
> >
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
> >     at
> >
> >
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
> >     at
> >
> >
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
> >     at
> >
> >
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
> >     at
> >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >     at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> >     at org.apache.spark.streaming.kafka.KafkaCluster.org
> >
> >
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
> >     at
> >
> >
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
> >     at
> >
> >
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
> >     at
> >
> >
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:310)
> >
> > Need help from experts to resolve this.
> >
> > Thanks,
> > Sourabh
> >
>
>
>
> --
> Thanks,
> Raja.
>

Reply via email to