oops .. it's 0.10.0 .. sorry for the confusion ..

On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> My assembly contains the 0.10.1 classes .. Here are the dependencies
> related to kafka & spark that my assembly has ..
>
> libraryDependencies ++= Seq(
>   "org.apache.kafka"      %   "kafka-streams"                  %
> "0.10.0.0",
>   "org.apache.spark"     %%   "spark-streaming-kafka-0-10"     % spark,
>   "org.apache.spark"     %%   "spark-core"                     % spark %
> "provided",
>   "org.apache.spark"     %%   "spark-streaming"                % spark %
> "provided",
>   "org.apache.spark"     %%   "spark-mllib"                    % spark %
> "provided",
>   "org.apache.spark"     %%   "spark-sql"                      % spark %
> "provided"
> )
>
> regards.
>
> On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> When you say 0.10.1 do you mean broker version only, or does your
>> assembly contain classes from the 0.10.1 kafka consumer?
>>
>> On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com>
>> wrote:
>> > Hello -
>> >
>> > I am facing some issues with the following snippet of code that reads
>> from
>> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
>> with
>> > Kafka 0.10.1 and Spark 2.0.1.
>> >
>> > // get the data from kafka
>> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
>> >     streamingContext,
>> >     PreferConsistent,
>> >     Subscribe[Array[Byte], (String, String)](topicToReadFrom,
>> kafkaParams)
>> >   )
>> >
>> > // label and vectorize the value
>> > val projected: DStream[(String, Vector)] = stream.map { record =>
>> >   val (label, value) = record.value
>> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>> >   (label, vector)
>> > }.transform(projectToLowerDimension)
>> >
>> > In the above snippet if I have the call to transform in the last line,
>> I get
>> > the following exception ..
>> >
>> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
>> not
>> > safe for multi-threaded access
>> >     at
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(
>> KafkaConsumer.java:1431)
>> >     at
>> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaCo
>> nsumer.java:1132)
>> >     at
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek
>> (CachedKafkaConsumer.scala:95)
>> >     at
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(
>> CachedKafkaConsumer.scala:69)
>> >     at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:227)
>> >     at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:193)
>> >     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >     at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
>> >     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> >     at
>> > scala.collection.generic.Growable$class.$plus$plus$eq(Growab
>> le.scala:59)
>> >     at
>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff
>> er.scala:104)
>> >     at
>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff
>> er.scala:48)
>> >     at scala.collection.TraversableOnce$class.to(TraversableOnce.
>> scala:310)
>> >     at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>> >     at
>> > scala.collection.TraversableOnce$class.toBuffer(
>> TraversableOnce.scala:302)
>> >     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
>> >     ....
>> >
>> > The transform method does a PCA and gives the top 2 principal
>> components ..
>> >
>> > private def projectToLowerDimension: RDD[(String, Vector)] =>
>> RDD[(String,
>> > Vector)] = { rdd =>
>> >   if (rdd.isEmpty) rdd else {
>> >     // reduce to 2 dimensions
>> >     val pca = new PCA(2).fit(rdd.map(_._2))
>> >
>> >     // Project vectors to the linear space spanned by the top 2
>> principal
>> >     // components, keeping the label
>> >     rdd.map(p => (p._1, pca.transform(p._2)))
>> >   }
>> > }
>> >
>> > However if I remove the transform call, I can process everything
>> correctly.
>> >
>> > Any help will be most welcome ..
>> >
>> > regards.
>> > - Debasish
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to