Can you give this a try: conf = SparkConf().set("spark.executor.memory", "32G")*.set("spark.akka.frameSize > ", > "10000000").set("spark.broadcast.factory","org.apache.spark.broadcast.TorrentBroadcastFactory")* > sc = SparkContext(conf = conf) > rdd = sc.parallelize(matrix,5) > from pyspark.mllib.clustering import KMeans > from math import sqrt > clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, > initializationMode="random") > def error(point): > center = clusters.centers[clusters.predict(point)] > return sqrt(sum([x**2 for x in (point - center)])) > WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) > print "Within Set Sum of Squared Error = " + str(WSSSE)
Thanks Best Regards On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu <chengi.liu...@gmail.com> wrote: > And the thing is code runs just fine if I reduce the number of rows in my > data? > > On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu <chengi.liu...@gmail.com> > wrote: > >> I am using spark1.0.2. >> This is my work cluster.. so I can't setup a new version readily... >> But right now, I am not using broadcast .. >> >> >> conf = SparkConf().set("spark.executor.memory", >> "32G").set("spark.akka.frameSize", "1000") >> sc = SparkContext(conf = conf) >> rdd = sc.parallelize(matrix,5) >> >> from pyspark.mllib.clustering import KMeans >> from math import sqrt >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >> initializationMode="random") >> def error(point): >> center = clusters.centers[clusters.predict(point)] >> return sqrt(sum([x**2 for x in (point - center)])) >> >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >> print "Within Set Sum of Squared Error = " + str(WSSSE) >> >> >> executed by >> spark-submit --master $SPARKURL clustering_example.py --executor-memory >> 32G --driver-memory 60G >> >> and the error I see >> py4j.protocol.Py4JJavaError: An error occurred while calling >> o26.trainKMeansModel. >> : org.apache.spark.SparkException: Job aborted due to stage failure: All >> masters are unresponsive! Giving up. >> at org.apache.spark.scheduler.DAGScheduler.org >> <http://org.apache.spark.scheduler.dagscheduler.org/> >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >> at scala.Option.foreach(Option.scala:236) >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> and >> 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to >> akka.tcp://sparkMaster@hostname:7077: >> akka.remote.EndpointAssociationException: Association failed with >> [akka.tcp://sparkMaster@ hostname:7077] >> >> ?? >> Any suggestions?? >> >> >> On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu <dav...@databricks.com> >> wrote: >> >>> Hey Chengi, >>> >>> What's the version of Spark you are using? It have big improvements >>> about broadcast in 1.1, could you try it? >>> >>> On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu <chengi.liu...@gmail.com> >>> wrote: >>> > Any suggestions.. I am really blocked on this one >>> > >>> > On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu <chengi.liu...@gmail.com> >>> wrote: >>> >> >>> >> And when I use sparksubmit script, I get the following error: >>> >> >>> >> py4j.protocol.Py4JJavaError: An error occurred while calling >>> >> o26.trainKMeansModel. >>> >> : org.apache.spark.SparkException: Job aborted due to stage failure: >>> All >>> >> masters are unresponsive! Giving up. >>> >> at >>> >> org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>> >> at >>> >> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>> >> at scala.Option.foreach(Option.scala:236) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> >> at >>> >> >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> >> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> >> at >>> >> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> >> at >>> >> >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> >> at >>> >> >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >> >>> >> >>> >> My spark submit code is >>> >> >>> >> conf = SparkConf().set("spark.executor.memory", >>> >> "32G").set("spark.akka.frameSize", "1000") >>> >> sc = SparkContext(conf = conf) >>> >> rdd = sc.parallelize(matrix,5) >>> >> >>> >> from pyspark.mllib.clustering import KMeans >>> >> from math import sqrt >>> >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>> >> initializationMode="random") >>> >> def error(point): >>> >> center = clusters.centers[clusters.predict(point)] >>> >> return sqrt(sum([x**2 for x in (point - center)])) >>> >> >>> >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >>> >> print "Within Set Sum of Squared Error = " + str(WSSSE) >>> >> >>> >> Which is executed as following: >>> >> spark-submit --master $SPARKURL clustering_example.py >>> --executor-memory >>> >> 32G --driver-memory 60G >>> >> >>> >> On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu <chengi.liu...@gmail.com >>> > >>> >> wrote: >>> >>> >>> >>> How? Example please.. >>> >>> Also, if I am running this in pyspark shell.. how do i configure >>> >>> spark.akka.frameSize ?? >>> >>> >>> >>> >>> >>> On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das < >>> ak...@sigmoidanalytics.com> >>> >>> wrote: >>> >>>> >>> >>>> When the data size is huge, you better of use the >>> >>>> torrentBroadcastFactory. >>> >>>> >>> >>>> Thanks >>> >>>> Best Regards >>> >>>> >>> >>>> On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu < >>> chengi.liu...@gmail.com> >>> >>>> wrote: >>> >>>>> >>> >>>>> Specifically the error I see when I try to operate on rdd created >>> by >>> >>>>> sc.parallelize method >>> >>>>> : org.apache.spark.SparkException: Job aborted due to stage >>> failure: >>> >>>>> Serialized task 12:12 was 12062263 bytes which exceeds >>> spark.akka.frameSize >>> >>>>> (10485760 bytes). Consider using broadcast variables for large >>> values. >>> >>>>> >>> >>>>> On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu < >>> chengi.liu...@gmail.com> >>> >>>>> wrote: >>> >>>>>> >>> >>>>>> Hi, >>> >>>>>> I am trying to create an rdd out of large matrix.... >>> sc.parallelize >>> >>>>>> suggest to use broadcast >>> >>>>>> But when I do >>> >>>>>> >>> >>>>>> sc.broadcast(data) >>> >>>>>> I get this error: >>> >>>>>> >>> >>>>>> Traceback (most recent call last): >>> >>>>>> File "<stdin>", line 1, in <module> >>> >>>>>> File "/usr/common/usg/spark/1.0.2/python/pyspark/context.py", >>> line >>> >>>>>> 370, in broadcast >>> >>>>>> pickled = pickleSer.dumps(value) >>> >>>>>> File >>> "/usr/common/usg/spark/1.0.2/python/pyspark/serializers.py", >>> >>>>>> line 279, in dumps >>> >>>>>> def dumps(self, obj): return cPickle.dumps(obj, 2) >>> >>>>>> SystemError: error return without exception set >>> >>>>>> Help? >>> >>>>>> >>> >>>>> >>> >>>> >>> >>> >>> >> >>> > >>> >> >> >