Try: rdd = sc.broadcast(matrix)
Or rdd = sc.parallelize(matrix,100) // Just increase the number of slices, give it a try. Thanks Best Regards On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu <chengi.liu...@gmail.com> wrote: > Hi Akhil, > So with your config (specifically with set("spark.akka.frameSize ", > "10000000")) , I see the error: > org.apache.spark.SparkException: Job aborted due to stage failure: > Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize > (10485760 bytes). Consider using broadcast variables for large values. > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) > at org.apache.spark > > So, I changed > set("spark.akka.frameSize ", "10000000") to set("spark.akka.frameSize ", > "10000000*00*") > but now I get the same error? > > y4j.protocol.Py4JJavaError: An error occurred while calling > o28.trainKMeansModel. > : org.apache.spark.SparkException: Job aborted due to stage failure: All > masters are unresponsive! Giving up. > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched > > > along with following: > 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted any > resources; check your cluster UI to ensure that workers are registered and > have sufficient memory > 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master > spark://host:7077... > 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to > akka.tcp://sparkMaster@host:7077: > akka.remote.EndpointAssociationException: Association failed with > [akka.tcp://sparkMaster@host:7077] > 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to > akka.tcp://sparkMaster@host:7077: > akka.remote.EndpointAssociationException: Association failed with > [akka.tcp://sparkMaster@host:7077] > 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to > akka.tcp://sparkMaster@host:7077: > akka.remote.EndpointAssociationException: Association failed with > [akka.tcp://sparkMaster@host:7077] > 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to > akka.tcp://sparkMaster@host:7077: > akka.remote.EndpointAssociationException: Association failed with > [akka.tcp://sparkMaster@host:7077] > 14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted any > resources; check your cluster UI to ensure that workers are registered and > have sufficient memory > 14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted any > resources; check your cluster UI to ensure that workers are registered and > have sufficient memory > 14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been > killed. Reason: All masters are unresponsive! Giving up. > > > :-( > > On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Can you give this a try: >> >> conf = SparkConf().set("spark.executor.memory", >> "32G")*.set("spark.akka.frameSize >>> ", >>> "10000000").set("spark.broadcast.factory","org.apache.spark.broadcast.TorrentBroadcastFactory")* >>> sc = SparkContext(conf = conf) >>> rdd = sc.parallelize(matrix,5) >>> from pyspark.mllib.clustering import KMeans >>> from math import sqrt >>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>> initializationMode="random") >>> def error(point): >>> center = clusters.centers[clusters.predict(point)] >>> return sqrt(sum([x**2 for x in (point - center)])) >>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >>> print "Within Set Sum of Squared Error = " + str(WSSSE) >> >> >> Thanks >> Best Regards >> >> On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu <chengi.liu...@gmail.com> >> wrote: >> >>> And the thing is code runs just fine if I reduce the number of rows in >>> my data? >>> >>> On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu <chengi.liu...@gmail.com> >>> wrote: >>> >>>> I am using spark1.0.2. >>>> This is my work cluster.. so I can't setup a new version readily... >>>> But right now, I am not using broadcast .. >>>> >>>> >>>> conf = SparkConf().set("spark.executor.memory", >>>> "32G").set("spark.akka.frameSize", "1000") >>>> sc = SparkContext(conf = conf) >>>> rdd = sc.parallelize(matrix,5) >>>> >>>> from pyspark.mllib.clustering import KMeans >>>> from math import sqrt >>>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>>> initializationMode="random") >>>> def error(point): >>>> center = clusters.centers[clusters.predict(point)] >>>> return sqrt(sum([x**2 for x in (point - center)])) >>>> >>>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >>>> print "Within Set Sum of Squared Error = " + str(WSSSE) >>>> >>>> >>>> executed by >>>> spark-submit --master $SPARKURL clustering_example.py >>>> --executor-memory 32G --driver-memory 60G >>>> >>>> and the error I see >>>> py4j.protocol.Py4JJavaError: An error occurred while calling >>>> o26.trainKMeansModel. >>>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>>> All masters are unresponsive! Giving up. >>>> at org.apache.spark.scheduler.DAGScheduler.org >>>> <http://org.apache.spark.scheduler.dagscheduler.org/> >>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>>> at >>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>> at scala.Option.foreach(Option.scala:236) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >>>> at >>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> >>>> and >>>> 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to >>>> akka.tcp://sparkMaster@hostname:7077: >>>> akka.remote.EndpointAssociationException: Association failed with >>>> [akka.tcp://sparkMaster@ hostname:7077] >>>> >>>> ?? >>>> Any suggestions?? >>>> >>>> >>>> On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu <dav...@databricks.com> >>>> wrote: >>>> >>>>> Hey Chengi, >>>>> >>>>> What's the version of Spark you are using? It have big improvements >>>>> about broadcast in 1.1, could you try it? >>>>> >>>>> On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu <chengi.liu...@gmail.com> >>>>> wrote: >>>>> > Any suggestions.. I am really blocked on this one >>>>> > >>>>> > On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu <chengi.liu...@gmail.com> >>>>> wrote: >>>>> >> >>>>> >> And when I use sparksubmit script, I get the following error: >>>>> >> >>>>> >> py4j.protocol.Py4JJavaError: An error occurred while calling >>>>> >> o26.trainKMeansModel. >>>>> >> : org.apache.spark.SparkException: Job aborted due to stage >>>>> failure: All >>>>> >> masters are unresponsive! Giving up. >>>>> >> at >>>>> >> org.apache.spark.scheduler.DAGScheduler.org >>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>>>> >> at >>>>> >> >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>>>> >> at >>>>> >> >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>>>> >> at >>>>> >> >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> >> at >>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> >> at >>>>> >> >>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>>>> >> at >>>>> >> >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>> >> at >>>>> >> >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>> >> at scala.Option.foreach(Option.scala:236) >>>>> >> at >>>>> >> >>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >>>>> >> at >>>>> >> >>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >>>>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>>> >> at >>>>> >> >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>> >> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> >> at >>>>> >> >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> >> at >>>>> >> >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> >> at >>>>> >> >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> >> >>>>> >> >>>>> >> My spark submit code is >>>>> >> >>>>> >> conf = SparkConf().set("spark.executor.memory", >>>>> >> "32G").set("spark.akka.frameSize", "1000") >>>>> >> sc = SparkContext(conf = conf) >>>>> >> rdd = sc.parallelize(matrix,5) >>>>> >> >>>>> >> from pyspark.mllib.clustering import KMeans >>>>> >> from math import sqrt >>>>> >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>>>> >> initializationMode="random") >>>>> >> def error(point): >>>>> >> center = clusters.centers[clusters.predict(point)] >>>>> >> return sqrt(sum([x**2 for x in (point - center)])) >>>>> >> >>>>> >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + >>>>> y) >>>>> >> print "Within Set Sum of Squared Error = " + str(WSSSE) >>>>> >> >>>>> >> Which is executed as following: >>>>> >> spark-submit --master $SPARKURL clustering_example.py >>>>> --executor-memory >>>>> >> 32G --driver-memory 60G >>>>> >> >>>>> >> On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu < >>>>> chengi.liu...@gmail.com> >>>>> >> wrote: >>>>> >>> >>>>> >>> How? Example please.. >>>>> >>> Also, if I am running this in pyspark shell.. how do i configure >>>>> >>> spark.akka.frameSize ?? >>>>> >>> >>>>> >>> >>>>> >>> On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das < >>>>> ak...@sigmoidanalytics.com> >>>>> >>> wrote: >>>>> >>>> >>>>> >>>> When the data size is huge, you better of use the >>>>> >>>> torrentBroadcastFactory. >>>>> >>>> >>>>> >>>> Thanks >>>>> >>>> Best Regards >>>>> >>>> >>>>> >>>> On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu < >>>>> chengi.liu...@gmail.com> >>>>> >>>> wrote: >>>>> >>>>> >>>>> >>>>> Specifically the error I see when I try to operate on rdd >>>>> created by >>>>> >>>>> sc.parallelize method >>>>> >>>>> : org.apache.spark.SparkException: Job aborted due to stage >>>>> failure: >>>>> >>>>> Serialized task 12:12 was 12062263 bytes which exceeds >>>>> spark.akka.frameSize >>>>> >>>>> (10485760 bytes). Consider using broadcast variables for large >>>>> values. >>>>> >>>>> >>>>> >>>>> On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu < >>>>> chengi.liu...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>>> >>>>> >>>>>> Hi, >>>>> >>>>>> I am trying to create an rdd out of large matrix.... >>>>> sc.parallelize >>>>> >>>>>> suggest to use broadcast >>>>> >>>>>> But when I do >>>>> >>>>>> >>>>> >>>>>> sc.broadcast(data) >>>>> >>>>>> I get this error: >>>>> >>>>>> >>>>> >>>>>> Traceback (most recent call last): >>>>> >>>>>> File "<stdin>", line 1, in <module> >>>>> >>>>>> File "/usr/common/usg/spark/1.0.2/python/pyspark/context.py", >>>>> line >>>>> >>>>>> 370, in broadcast >>>>> >>>>>> pickled = pickleSer.dumps(value) >>>>> >>>>>> File >>>>> "/usr/common/usg/spark/1.0.2/python/pyspark/serializers.py", >>>>> >>>>>> line 279, in dumps >>>>> >>>>>> def dumps(self, obj): return cPickle.dumps(obj, 2) >>>>> >>>>>> SystemError: error return without exception set >>>>> >>>>>> Help? >>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>>>> >>> >>>>> >> >>>>> > >>>>> >>>> >>>> >>> >> >