My assembly contains the 0.10.1 classes .. Here are the dependencies related to kafka & spark that my assembly has ..
libraryDependencies ++= Seq( "org.apache.kafka" % "kafka-streams" % "0.10.0.0", "org.apache.spark" %% "spark-streaming-kafka-0-10" % spark, "org.apache.spark" %% "spark-core" % spark % "provided", "org.apache.spark" %% "spark-streaming" % spark % "provided", "org.apache.spark" %% "spark-mllib" % spark % "provided", "org.apache.spark" %% "spark-sql" % spark % "provided" ) regards. On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org> wrote: > When you say 0.10.1 do you mean broker version only, or does your > assembly contain classes from the 0.10.1 kafka consumer? > > On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com> > wrote: > > Hello - > > > > I am facing some issues with the following snippet of code that reads > from > > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) > with > > Kafka 0.10.1 and Spark 2.0.1. > > > > // get the data from kafka > > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = > > KafkaUtils.createDirectStream[Array[Byte], (String, String)]( > > streamingContext, > > PreferConsistent, > > Subscribe[Array[Byte], (String, String)](topicToReadFrom, > kafkaParams) > > ) > > > > // label and vectorize the value > > val projected: DStream[(String, Vector)] = stream.map { record => > > val (label, value) = record.value > > val vector = Vectors.dense(value.split(",").map(_.toDouble)) > > (label, vector) > > }.transform(projectToLowerDimension) > > > > In the above snippet if I have the call to transform in the last line, I > get > > the following exception .. > > > > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is > not > > safe for multi-threaded access > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1431) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.seek( > KafkaConsumer.java:1132) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > seek(CachedKafkaConsumer.scala:95) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:69) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > > at > > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > > at > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq( > ArrayBuffer.scala:104) > > at > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > > at scala.collection.TraversableOnce$class.to( > TraversableOnce.scala:310) > > at scala.collection.AbstractIterator.to(Iterator.scala:1336) > > at > > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce. > scala:302) > > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) > > .... > > > > The transform method does a PCA and gives the top 2 principal components > .. > > > > private def projectToLowerDimension: RDD[(String, Vector)] => > RDD[(String, > > Vector)] = { rdd => > > if (rdd.isEmpty) rdd else { > > // reduce to 2 dimensions > > val pca = new PCA(2).fit(rdd.map(_._2)) > > > > // Project vectors to the linear space spanned by the top 2 principal > > // components, keeping the label > > rdd.map(p => (p._1, pca.transform(p._2))) > > } > > } > > > > However if I remove the transform call, I can process everything > correctly. > > > > Any help will be most welcome .. > > > > regards. > > - Debasish > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg