When you say 0.10.1 do you mean broker version only, or does your
assembly contain classes from the 0.10.1 kafka consumer?

On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com> wrote:
> Hello -
>
> I am facing some issues with the following snippet of code that reads from
> Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with
> Kafka 0.10.1 and Spark 2.0.1.
>
> // get the data from kafka
> val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
>     streamingContext,
>     PreferConsistent,
>     Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
>   )
>
> // label and vectorize the value
> val projected: DStream[(String, Vector)] = stream.map { record =>
>   val (label, value) = record.value
>   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>   (label, vector)
> }.transform(projectToLowerDimension)
>
> In the above snippet if I have the call to transform in the last line, I get
> the following exception ..
>
> Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>     at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>     at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
>     at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>     at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>     at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>     at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>     at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>     at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>     at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>     at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
>     ....
>
> The transform method does a PCA and gives the top 2 principal components ..
>
> private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
> Vector)] = { rdd =>
>   if (rdd.isEmpty) rdd else {
>     // reduce to 2 dimensions
>     val pca = new PCA(2).fit(rdd.map(_._2))
>
>     // Project vectors to the linear space spanned by the top 2 principal
>     // components, keeping the label
>     rdd.map(p => (p._1, pca.transform(p._2)))
>   }
> }
>
> However if I remove the transform call, I can process everything correctly.
>
> Any help will be most welcome ..
>
> regards.
> - Debasish
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to