oops .. it's 0.10.0 .. sorry for the confusion .. On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh <ghosh.debas...@gmail.com> wrote:
> My assembly contains the 0.10.1 classes .. Here are the dependencies > related to kafka & spark that my assembly has .. > > libraryDependencies ++= Seq( > "org.apache.kafka" % "kafka-streams" % > "0.10.0.0", > "org.apache.spark" %% "spark-streaming-kafka-0-10" % spark, > "org.apache.spark" %% "spark-core" % spark % > "provided", > "org.apache.spark" %% "spark-streaming" % spark % > "provided", > "org.apache.spark" %% "spark-mllib" % spark % > "provided", > "org.apache.spark" %% "spark-sql" % spark % > "provided" > ) > > regards. > > On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> When you say 0.10.1 do you mean broker version only, or does your >> assembly contain classes from the 0.10.1 kafka consumer? >> >> On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com> >> wrote: >> > Hello - >> > >> > I am facing some issues with the following snippet of code that reads >> from >> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) >> with >> > Kafka 0.10.1 and Spark 2.0.1. >> > >> > // get the data from kafka >> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = >> > KafkaUtils.createDirectStream[Array[Byte], (String, String)]( >> > streamingContext, >> > PreferConsistent, >> > Subscribe[Array[Byte], (String, String)](topicToReadFrom, >> kafkaParams) >> > ) >> > >> > // label and vectorize the value >> > val projected: DStream[(String, Vector)] = stream.map { record => >> > val (label, value) = record.value >> > val vector = Vectors.dense(value.split(",").map(_.toDouble)) >> > (label, vector) >> > }.transform(projectToLowerDimension) >> > >> > In the above snippet if I have the call to transform in the last line, >> I get >> > the following exception .. >> > >> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is >> not >> > safe for multi-threaded access >> > at >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire( >> KafkaConsumer.java:1431) >> > at >> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaCo >> nsumer.java:1132) >> > at >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek >> (CachedKafkaConsumer.scala:95) >> > at >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get( >> CachedKafkaConsumer.scala:69) >> > at >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:227) >> > at >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:193) >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:893) >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) >> > at >> > scala.collection.generic.Growable$class.$plus$plus$eq(Growab >> le.scala:59) >> > at >> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff >> er.scala:104) >> > at >> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff >> er.scala:48) >> > at scala.collection.TraversableOnce$class.to(TraversableOnce. >> scala:310) >> > at scala.collection.AbstractIterator.to(Iterator.scala:1336) >> > at >> > scala.collection.TraversableOnce$class.toBuffer( >> TraversableOnce.scala:302) >> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) >> > .... >> > >> > The transform method does a PCA and gives the top 2 principal >> components .. >> > >> > private def projectToLowerDimension: RDD[(String, Vector)] => >> RDD[(String, >> > Vector)] = { rdd => >> > if (rdd.isEmpty) rdd else { >> > // reduce to 2 dimensions >> > val pca = new PCA(2).fit(rdd.map(_._2)) >> > >> > // Project vectors to the linear space spanned by the top 2 >> principal >> > // components, keeping the label >> > rdd.map(p => (p._1, pca.transform(p._2))) >> > } >> > } >> > >> > However if I remove the transform call, I can process everything >> correctly. >> > >> > Any help will be most welcome .. >> > >> > regards. >> > - Debasish >> > >> > >> > >> > -- >> > View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > --------------------------------------------------------------------- >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > >> > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg