Ok...let me look into it a bit more and most likely I will deploy the Spark
v1.1 and then use mllib 1.1 SNAPSHOT jar with it so that we follow your
guideline of not running newer spark component on older version of spark
core...

That should solve this issue unless it is related to Java versions....

I am also keen to see the final recommendation within L1 and
Positivity....I will compute the metrics

Our plan is to use scalable matrix factorization as an engine to do
clustering, feature extraction, topic modeling and auto encoders (single
layer to start with). So these algorithms are not really constrained to
recommendation use-cases...



On Wed, Aug 6, 2014 at 9:09 AM, Xiangrui Meng <men...@gmail.com> wrote:

> One thing I like to clarify is that we do not support running a newer
> version of a Spark component on top of a older version of Spark core.
> I don't remember any code change in MLlib that requires Spark v1.1 but
> I might miss some PRs. There were changes to CoGroup, which may be
> relevant:
>
>
> https://github.com/apache/spark/commits/master/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
>
> Btw, for the constrained optimization, I'm really interested in how
> they differ in the final recommendation? It would be great if you can
> test prec@k or ndcg@k metrics.
>
> Best,
> Xiangrui
>
> On Wed, Aug 6, 2014 at 8:28 AM, Debasish Das <debasish.da...@gmail.com>
> wrote:
> > Hi Xiangrui,
> >
> > Maintaining another file will be a pain later so I deployed spark 1.0.1
> > without mllib and then my application jar bundles mllib 1.1.0-SNAPSHOT
> along
> > with the code changes for quadratic optimization...
> >
> > Later the plan is to patch the snapshot mllib with the deployed stable
> > mllib...
> >
> > There are 5 variants that I am experimenting with around 400M ratings
> (daily
> > data, monthly data I will update in few days)...
> >
> > 1. LS
> > 2. NNLS
> > 3. Quadratic with bounds
> > 4. Quadratic with L1
> > 5. Quadratic with equality and positivity
> >
> > Now the ALS 1.1.0 snapshot runs fine but after completion on this step
> > ALS.scala:311
> >
> > // Materialize usersOut and productsOut.
> > usersOut.count()
> >
> > I am getting from one of the executors: java.lang.ClassCastException:
> > scala.Tuple1 cannot be cast to scala.Product2
> >
> > I am debugging it further but I was wondering if this is due to RDD
> > compatibility within 1.0.1 and 1.1.0-SNAPSHOT ?
> >
> > I have built the jars on my Mac which has Java 1.7.0_55 but the deployed
> > cluster has Java 1.7.0_45.
> >
> > The flow runs fine on my localhost spark 1.0.1 with 1 worker. Can that
> Java
> > version mismatch cause this ?
> >
> > Stack traces are below
> >
> > Thanks.
> > Deb
> >
> >
> > Executor stacktrace:
> >
> >
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:156)
> >
> >
> >
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154)
> >
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >
> >
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >
> >         org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:154)
> >
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >
> >
> > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
> >
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >
> >
> >
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
> >
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >
> >
> > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
> >
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >
> >
> >
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:126)
> >
> >
> >
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:123)
> >
> >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >
> >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >
> >
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >
> >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >
> >         org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:123)
> >
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >
> >
> > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
> >
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >
> >
> >
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
> >
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >
> >
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
> >
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >
> >
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> >
> >
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> >
> >         org.apache.spark.scheduler.Task.run(Task.scala:51)
> >
> >
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> >
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >
> >         java.lang.Thread.run(Thread.java:744)
> >
> > Driver stacktrace:
> >
> > at
> > org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
> >
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
> >
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
> >
> > at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >
> > at
> >
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
> >
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> >
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> >
> > at scala.Option.foreach(Option.scala:236)
> >
> > at
> >
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
> >
> > at
> >
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
> >
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >
> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >
> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >
> > at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >
> > at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> >
> >
> > On Tue, Aug 5, 2014 at 5:59 PM, Debasish Das <debasish.da...@gmail.com>
> > wrote:
> >>
> >> Hi Xiangrui,
> >>
> >> I used your idea and kept a cherry picked version of ALS.scala in my
> >> application and call it ALSQp.scala...this is a OK workaround for now
> till a
> >> version adds up to master for example...
> >>
> >> For the bug with userClassPathFirst, looks like Koert already found this
> >> issue in the following JIRA:
> >>
> >> https://issues.apache.org/jira/browse/SPARK-1863
> >>
> >> By the way the userClassPathFirst feature is very useful since I am sure
> >> the deployed version of spark on a production cluster will always be the
> >> last stable (core at 1.0.1 in my case) and people would like to deploy
> >> SNAPSHOT versions of libraries that build on top of spark core (mllib,
> >> streaming etc)...
> >>
> >> Another way is to have a build option that deploys only the core and not
> >> the libraries built upon core...
> >>
> >> Do we have an option like that in make-distribution script ?
> >>
> >> Thanks.
> >> Deb
> >>
> >>
> >> On Tue, Aug 5, 2014 at 10:37 AM, Xiangrui Meng <men...@gmail.com>
> wrote:
> >>>
> >>> If you cannot change the Spark jar deployed on the cluster, an easy
> >>> solution would be renaming ALS in your jar. If userClassPathFirst
> >>> doesn't work, could you create a JIRA and attach the log? Thanks!
> >>> -Xiangrui
> >>>
> >>> On Tue, Aug 5, 2014 at 9:10 AM, Debasish Das <debasish.da...@gmail.com
> >
> >>> wrote:
> >>> > I created the assembly file but still it wants to pick the mllib from
> >>> > the
> >>> > cluster:
> >>> >
> >>> > jar tf ./target/ml-0.0.1-SNAPSHOT-jar-with-dependencies.jar | grep
> >>> > QuadraticMinimizer
> >>> >
> >>> > org/apache/spark/mllib/optimization/QuadraticMinimizer$$anon$1.class
> >>> >
> >>> > /Users/v606014/dist-1.0.1/bin/spark-submit --master
> >>> > spark://TUSCA09LMLVT00C.local:7077 --class ALSDriver
> >>> > ./target/ml-0.0.1-SNAPSHOT-jar-with-dependencies.jar inputPath
> >>> > outputPath
> >>> >
> >>> > Exception in thread "main" java.lang.NoSuchMethodError:
> >>> >
> >>> >
> org.apache.spark.mllib.recommendation.ALS.setLambdaL1(D)Lorg/apache/spark/mllib/recommendation/ALS;
> >>> >
> >>> > Now if I force it to use the jar that I gave using
> >>> > spark.files.userClassPathFirst, then it fails on some serialization
> >>> > issues...
> >>> >
> >>> > A simple solution is to cherry pick the files I need from spark
> branch
> >>> > to
> >>> > the application branch but I am not sure that's the right thing to
> >>> > do...
> >>> >
> >>> > The way userClassPathFirst is behaving, there might be bugs in it...
> >>> >
> >>> > Any suggestions will be appreciated....
> >>> >
> >>> > Thanks.
> >>> > Deb
> >>> >
> >>> >
> >>> > On Sat, Aug 2, 2014 at 11:12 AM, Xiangrui Meng <men...@gmail.com>
> >>> > wrote:
> >>> >>
> >>> >> Yes, that should work. spark-mllib-1.1.0 should be compatible with
> >>> >> spark-core-1.0.1.
> >>> >>
> >>> >> On Sat, Aug 2, 2014 at 10:54 AM, Debasish Das
> >>> >> <debasish.da...@gmail.com>
> >>> >> wrote:
> >>> >> > Let me try it...
> >>> >> >
> >>> >> > Will this be fixed if I generate a assembly file with mllib-1.1.0
> >>> >> > SNAPSHOT
> >>> >> > jar and other dependencies with the rest of the application code ?
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> > On Sat, Aug 2, 2014 at 10:46 AM, Xiangrui Meng <men...@gmail.com>
> >>> >> > wrote:
> >>> >> >>
> >>> >> >> You can try enabling "spark.files.userClassPathFirst". But I'm
> not
> >>> >> >> sure whether it could solve your problem. -Xiangrui
> >>> >> >>
> >>> >> >> On Sat, Aug 2, 2014 at 10:13 AM, Debasish Das
> >>> >> >> <debasish.da...@gmail.com>
> >>> >> >> wrote:
> >>> >> >> > Hi,
> >>> >> >> >
> >>> >> >> > I have deployed spark stable 1.0.1 on the cluster but I have
> new
> >>> >> >> > code
> >>> >> >> > that
> >>> >> >> > I added in mllib-1.1.0-SNAPSHOT.
> >>> >> >> >
> >>> >> >> > I am trying to access the new code using spark-submit as
> follows:
> >>> >> >> >
> >>> >> >> > spark-job --class
> com.verizon.bda.mllib.recommendation.ALSDriver
> >>> >> >> > --executor-memory 16g --total-executor-cores 16 --jars
> >>> >> >> > spark-mllib_2.10-1.1.0-SNAPSHOT.jar,scopt_2.10-3.2.0.jar
> >>> >> >> > sag-core-0.0.1-SNAPSHOT.jar --rank 25 --numIterations 10
> --lambda
> >>> >> >> > 1.0
> >>> >> >> > --qpProblem 2 inputPath outputPath
> >>> >> >> >
> >>> >> >> > I can see the jars are getting added to httpServer as expected:
> >>> >> >> >
> >>> >> >> > 14/08/02 12:50:04 INFO SparkContext: Added JAR
> >>> >> >> >
> >>> >> >> >
> file:/vzhome/v606014/spark-glm/spark-mllib_2.10-1.1.0-SNAPSHOT.jar at
> >>> >> >> >
> >>> >> >> >
> http://10.145.84.20:37798/jars/spark-mllib_2.10-1.1.0-SNAPSHOT.jar
> >>> >> >> > with
> >>> >> >> > timestamp 1406998204236
> >>> >> >> >
> >>> >> >> > 14/08/02 12:50:04 INFO SparkContext: Added JAR
> >>> >> >> > file:/vzhome/v606014/spark-glm/scopt_2.10-3.2.0.jar at
> >>> >> >> > http://10.145.84.20:37798/jars/scopt_2.10-3.2.0.jar with
> >>> >> >> > timestamp
> >>> >> >> > 1406998204237
> >>> >> >> >
> >>> >> >> > 14/08/02 12:50:04 INFO SparkContext: Added JAR
> >>> >> >> > file:/vzhome/v606014/spark-glm/sag-core-0.0.1-SNAPSHOT.jar at
> >>> >> >> > http://10.145.84.20:37798/jars/sag-core-0.0.1-SNAPSHOT.jar
> with
> >>> >> >> > timestamp
> >>> >> >> > 1406998204238
> >>> >> >> >
> >>> >> >> > But the job still can't access code form mllib-1.1.0
> >>> >> >> > SNAPSHOT.jar...I
> >>> >> >> > think
> >>> >> >> > it's picking up the mllib from cluster which is at 1.0.1...
> >>> >> >> >
> >>> >> >> > Please help. I will ask for a PR tomorrow but internally we
> want
> >>> >> >> > to
> >>> >> >> > generate results from the new code.
> >>> >> >> >
> >>> >> >> > Thanks.
> >>> >> >> >
> >>> >> >> > Deb
> >>> >> >
> >>> >> >
> >>> >
> >>> >
> >>
> >>
> >
>

Reply via email to