My assembly contains the 0.10.1 classes .. Here are the dependencies
related to kafka & spark that my assembly has ..

libraryDependencies ++= Seq(
  "org.apache.kafka"      %   "kafka-streams"                  % "0.10.0.0",
  "org.apache.spark"     %%   "spark-streaming-kafka-0-10"     % spark,
  "org.apache.spark"     %%   "spark-core"                     % spark %
"provided",
  "org.apache.spark"     %%   "spark-streaming"                % spark %
"provided",
  "org.apache.spark"     %%   "spark-mllib"                    % spark %
"provided",
  "org.apache.spark"     %%   "spark-sql"                      % spark %
"provided"
)

regards.

On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org> wrote:

> When you say 0.10.1 do you mean broker version only, or does your
> assembly contain classes from the 0.10.1 kafka consumer?
>
> On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com>
> wrote:
> > Hello -
> >
> > I am facing some issues with the following snippet of code that reads
> from
> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
> with
> > Kafka 0.10.1 and Spark 2.0.1.
> >
> > // get the data from kafka
> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
> >     streamingContext,
> >     PreferConsistent,
> >     Subscribe[Array[Byte], (String, String)](topicToReadFrom,
> kafkaParams)
> >   )
> >
> > // label and vectorize the value
> > val projected: DStream[(String, Vector)] = stream.map { record =>
> >   val (label, value) = record.value
> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
> >   (label, vector)
> > }.transform(projectToLowerDimension)
> >
> > In the above snippet if I have the call to transform in the last line, I
> get
> > the following exception ..
> >
> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
> not
> > safe for multi-threaded access
> >     at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> >     at
> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(
> KafkaConsumer.java:1132)
> >     at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> seek(CachedKafkaConsumer.scala:95)
> >     at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:69)
> >     at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >     at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >     at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
> >     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> >     at
> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> >     at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(
> ArrayBuffer.scala:104)
> >     at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> >     at scala.collection.TraversableOnce$class.to(
> TraversableOnce.scala:310)
> >     at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> >     at
> > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.
> scala:302)
> >     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> >     ....
> >
> > The transform method does a PCA and gives the top 2 principal components
> ..
> >
> > private def projectToLowerDimension: RDD[(String, Vector)] =>
> RDD[(String,
> > Vector)] = { rdd =>
> >   if (rdd.isEmpty) rdd else {
> >     // reduce to 2 dimensions
> >     val pca = new PCA(2).fit(rdd.map(_._2))
> >
> >     // Project vectors to the linear space spanned by the top 2 principal
> >     // components, keeping the label
> >     rdd.map(p => (p._1, pca.transform(p._2)))
> >   }
> > }
> >
> > However if I remove the transform call, I can process everything
> correctly.
> >
> > Any help will be most welcome ..
> >
> > regards.
> > - Debasish
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to