So.. same result with parallelize (matrix,1000) with broadcast.. seems like I got jvm core dump :-/ 4/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager host:47978 with 19.2 GB RAM 14/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager host:43360 with 19.2 GB RAM Unhandled exception Unhandled exception Type=Segmentation error vmState=0x00000000 J9Generic_Signal_Number=00000004 Signal_Number=0000000b Error_Value=00000000 Signal_Code=00000001 Handler1=00002AAAABF53760 Handler2=00002AAAAC3069D0 InaccessibleAddress=0000000000000000 RDI=00002AB9505F2698 RSI=00002AABAE2C54D8 RAX=00002AB7CE6009A0 RBX=00002AB7CE6009C0 RCX=00000000FFC7FFE0 RDX=00002AB8509726A8 R8=000000007FE41FF0 R9=0000000000002000 R10=00002AAAADA318A0 R11=00002AB850959520 R12=00002AB5EF97DD88 R13=00002AB5EF97BD88 R14=00002AAAAC0CE940 R15=00002AB5EF97BD88 RIP=0000000000000000 GS=0000 FS=0000 RSP=00000000007367A0 EFlags=0000000000210282 CS=0033 RBP=0000000000BCDB00 ERR=0000000000000014 TRAPNO=000000000000000E OLDMASK=0000000000000000 CR2=0000000000000000 xmm0 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) xmm1 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) xmm2 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) xmm3 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) xmm4 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) xmm5 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) xmm6 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06) xmm7 f180c714f8e2a139 (f: 4175601920.000000, d: -5.462583e+238) xmm8 00000000428e8000 (f: 1116635136.000000, d: 5.516911e-315) xmm9 0000000000000000 (f: 0.000000, d: 0.000000e+00) xmm10 0000000000000000 (f: 0.000000, d: 0.000000e+00) xmm11 0000000000000000 (f: 0.000000, d: 0.000000e+00) xmm12 0000000000000000 (f: 0.000000, d: 0.000000e+00) xmm13 0000000000000000 (f: 0.000000, d: 0.000000e+00) xmm14 0000000000000000 (f: 0.000000, d: 0.000000e+00) xmm15 0000000000000000 (f: 0.000000, d: 0.000000e+00) Target=2_60_20140106_181350 (Linux 3.0.93-0.8.2_1.0502.8048-cray_ari_c) CPU=amd64 (48 logical CPUs) (0xfc0c5b000 RAM) ----------- Stack Backtrace ----------- (0x00002AAAAC2FA122 [libj9prt26.so+0x13122]) (0x00002AAAAC30779F [libj9prt26.so+0x2079f]) (0x00002AAAAC2F9E6B [libj9prt26.so+0x12e6b]) (0x00002AAAAC2F9F67 [libj9prt26.so+0x12f67]) (0x00002AAAAC30779F [libj9prt26.so+0x2079f]) (0x00002AAAAC2F9A8B [libj9prt26.so+0x12a8b]) (0x00002AAAABF52C9D [libj9vm26.so+0x1ac9d]) (0x00002AAAAC30779F [libj9prt26.so+0x2079f]) (0x00002AAAABF52F56 [libj9vm26.so+0x1af56]) (0x00002AAAABF96CA0 [libj9vm26.so+0x5eca0]) --------------------------------------- JVMDUMP039I JVMDUMP032I
Note, this still is with the framesize I modified in the last email thread? On Mon, Sep 15, 2014 at 2:12 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Try: > > rdd = sc.broadcast(matrix) > > Or > > rdd = sc.parallelize(matrix,100) // Just increase the number of slices, > give it a try. > > > > Thanks > Best Regards > > On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu <chengi.liu...@gmail.com> > wrote: > >> Hi Akhil, >> So with your config (specifically with set("spark.akka.frameSize ", >> "10000000")) , I see the error: >> org.apache.spark.SparkException: Job aborted due to stage failure: >> Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize >> (10485760 bytes). Consider using broadcast variables for large values. >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >> at org.apache.spark >> >> So, I changed >> set("spark.akka.frameSize ", "10000000") to set("spark.akka.frameSize ", >> "10000000*00*") >> but now I get the same error? >> >> y4j.protocol.Py4JJavaError: An error occurred while calling >> o28.trainKMeansModel. >> : org.apache.spark.SparkException: Job aborted due to stage failure: All >> masters are unresponsive! Giving up. >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched >> >> >> along with following: >> 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted >> any resources; check your cluster UI to ensure that workers are registered >> and have sufficient memory >> 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master >> spark://host:7077... >> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to >> akka.tcp://sparkMaster@host:7077: >> akka.remote.EndpointAssociationException: Association failed with >> [akka.tcp://sparkMaster@host:7077] >> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to >> akka.tcp://sparkMaster@host:7077: >> akka.remote.EndpointAssociationException: Association failed with >> [akka.tcp://sparkMaster@host:7077] >> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to >> akka.tcp://sparkMaster@host:7077: >> akka.remote.EndpointAssociationException: Association failed with >> [akka.tcp://sparkMaster@host:7077] >> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to >> akka.tcp://sparkMaster@host:7077: >> akka.remote.EndpointAssociationException: Association failed with >> [akka.tcp://sparkMaster@host:7077] >> 14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted >> any resources; check your cluster UI to ensure that workers are registered >> and have sufficient memory >> 14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted >> any resources; check your cluster UI to ensure that workers are registered >> and have sufficient memory >> 14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been >> killed. Reason: All masters are unresponsive! Giving up. >> >> >> :-( >> >> On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> Can you give this a try: >>> >>> conf = SparkConf().set("spark.executor.memory", >>> "32G")*.set("spark.akka.frameSize >>>> ", >>>> "10000000").set("spark.broadcast.factory","org.apache.spark.broadcast.TorrentBroadcastFactory")* >>>> sc = SparkContext(conf = conf) >>>> rdd = sc.parallelize(matrix,5) >>>> from pyspark.mllib.clustering import KMeans >>>> from math import sqrt >>>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>>> initializationMode="random") >>>> def error(point): >>>> center = clusters.centers[clusters.predict(point)] >>>> return sqrt(sum([x**2 for x in (point - center)])) >>>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >>>> print "Within Set Sum of Squared Error = " + str(WSSSE) >>> >>> >>> Thanks >>> Best Regards >>> >>> On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu <chengi.liu...@gmail.com> >>> wrote: >>> >>>> And the thing is code runs just fine if I reduce the number of rows in >>>> my data? >>>> >>>> On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu <chengi.liu...@gmail.com> >>>> wrote: >>>> >>>>> I am using spark1.0.2. >>>>> This is my work cluster.. so I can't setup a new version readily... >>>>> But right now, I am not using broadcast .. >>>>> >>>>> >>>>> conf = SparkConf().set("spark.executor.memory", >>>>> "32G").set("spark.akka.frameSize", "1000") >>>>> sc = SparkContext(conf = conf) >>>>> rdd = sc.parallelize(matrix,5) >>>>> >>>>> from pyspark.mllib.clustering import KMeans >>>>> from math import sqrt >>>>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>>>> initializationMode="random") >>>>> def error(point): >>>>> center = clusters.centers[clusters.predict(point)] >>>>> return sqrt(sum([x**2 for x in (point - center)])) >>>>> >>>>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >>>>> print "Within Set Sum of Squared Error = " + str(WSSSE) >>>>> >>>>> >>>>> executed by >>>>> spark-submit --master $SPARKURL clustering_example.py >>>>> --executor-memory 32G --driver-memory 60G >>>>> >>>>> and the error I see >>>>> py4j.protocol.Py4JJavaError: An error occurred while calling >>>>> o26.trainKMeansModel. >>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> All masters are unresponsive! Giving up. >>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>> <http://org.apache.spark.scheduler.dagscheduler.org/> >>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>>>> at >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>> at scala.Option.foreach(Option.scala:236) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >>>>> at >>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>>> at >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> >>>>> >>>>> and >>>>> 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to >>>>> akka.tcp://sparkMaster@hostname:7077: >>>>> akka.remote.EndpointAssociationException: Association failed with >>>>> [akka.tcp://sparkMaster@ hostname:7077] >>>>> >>>>> ?? >>>>> Any suggestions?? >>>>> >>>>> >>>>> On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu <dav...@databricks.com> >>>>> wrote: >>>>> >>>>>> Hey Chengi, >>>>>> >>>>>> What's the version of Spark you are using? It have big improvements >>>>>> about broadcast in 1.1, could you try it? >>>>>> >>>>>> On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu <chengi.liu...@gmail.com> >>>>>> wrote: >>>>>> > Any suggestions.. I am really blocked on this one >>>>>> > >>>>>> > On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu < >>>>>> chengi.liu...@gmail.com> wrote: >>>>>> >> >>>>>> >> And when I use sparksubmit script, I get the following error: >>>>>> >> >>>>>> >> py4j.protocol.Py4JJavaError: An error occurred while calling >>>>>> >> o26.trainKMeansModel. >>>>>> >> : org.apache.spark.SparkException: Job aborted due to stage >>>>>> failure: All >>>>>> >> masters are unresponsive! Giving up. >>>>>> >> at >>>>>> >> org.apache.spark.scheduler.DAGScheduler.org >>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>>>>> >> at >>>>>> >> >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>>>>> >> at >>>>>> >> >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>>>>> >> at >>>>>> >> >>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>> >> at >>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>> >> at >>>>>> >> >>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>>>>> >> at >>>>>> >> >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>>> >> at >>>>>> >> >>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>>>> >> at scala.Option.foreach(Option.scala:236) >>>>>> >> at >>>>>> >> >>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >>>>>> >> at >>>>>> >> >>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >>>>>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>>>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>>>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>>>> >> at >>>>>> >> >>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>>> >> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> >> at >>>>>> >> >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> >> at >>>>>> >> >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> >> at >>>>>> >> >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> >> >>>>>> >> >>>>>> >> My spark submit code is >>>>>> >> >>>>>> >> conf = SparkConf().set("spark.executor.memory", >>>>>> >> "32G").set("spark.akka.frameSize", "1000") >>>>>> >> sc = SparkContext(conf = conf) >>>>>> >> rdd = sc.parallelize(matrix,5) >>>>>> >> >>>>>> >> from pyspark.mllib.clustering import KMeans >>>>>> >> from math import sqrt >>>>>> >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>>>>> >> initializationMode="random") >>>>>> >> def error(point): >>>>>> >> center = clusters.centers[clusters.predict(point)] >>>>>> >> return sqrt(sum([x**2 for x in (point - center)])) >>>>>> >> >>>>>> >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x >>>>>> + y) >>>>>> >> print "Within Set Sum of Squared Error = " + str(WSSSE) >>>>>> >> >>>>>> >> Which is executed as following: >>>>>> >> spark-submit --master $SPARKURL clustering_example.py >>>>>> --executor-memory >>>>>> >> 32G --driver-memory 60G >>>>>> >> >>>>>> >> On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu < >>>>>> chengi.liu...@gmail.com> >>>>>> >> wrote: >>>>>> >>> >>>>>> >>> How? Example please.. >>>>>> >>> Also, if I am running this in pyspark shell.. how do i configure >>>>>> >>> spark.akka.frameSize ?? >>>>>> >>> >>>>>> >>> >>>>>> >>> On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> >>>>>> >>> wrote: >>>>>> >>>> >>>>>> >>>> When the data size is huge, you better of use the >>>>>> >>>> torrentBroadcastFactory. >>>>>> >>>> >>>>>> >>>> Thanks >>>>>> >>>> Best Regards >>>>>> >>>> >>>>>> >>>> On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu < >>>>>> chengi.liu...@gmail.com> >>>>>> >>>> wrote: >>>>>> >>>>> >>>>>> >>>>> Specifically the error I see when I try to operate on rdd >>>>>> created by >>>>>> >>>>> sc.parallelize method >>>>>> >>>>> : org.apache.spark.SparkException: Job aborted due to stage >>>>>> failure: >>>>>> >>>>> Serialized task 12:12 was 12062263 bytes which exceeds >>>>>> spark.akka.frameSize >>>>>> >>>>> (10485760 bytes). Consider using broadcast variables for large >>>>>> values. >>>>>> >>>>> >>>>>> >>>>> On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu < >>>>>> chengi.liu...@gmail.com> >>>>>> >>>>> wrote: >>>>>> >>>>>> >>>>>> >>>>>> Hi, >>>>>> >>>>>> I am trying to create an rdd out of large matrix.... >>>>>> sc.parallelize >>>>>> >>>>>> suggest to use broadcast >>>>>> >>>>>> But when I do >>>>>> >>>>>> >>>>>> >>>>>> sc.broadcast(data) >>>>>> >>>>>> I get this error: >>>>>> >>>>>> >>>>>> >>>>>> Traceback (most recent call last): >>>>>> >>>>>> File "<stdin>", line 1, in <module> >>>>>> >>>>>> File >>>>>> "/usr/common/usg/spark/1.0.2/python/pyspark/context.py", line >>>>>> >>>>>> 370, in broadcast >>>>>> >>>>>> pickled = pickleSer.dumps(value) >>>>>> >>>>>> File >>>>>> "/usr/common/usg/spark/1.0.2/python/pyspark/serializers.py", >>>>>> >>>>>> line 279, in dumps >>>>>> >>>>>> def dumps(self, obj): return cPickle.dumps(obj, 2) >>>>>> >>>>>> SystemError: error return without exception set >>>>>> >>>>>> Help? >>>>>> >>>>>> >>>>>> >>>>> >>>>>> >>>> >>>>>> >>> >>>>>> >> >>>>>> > >>>>>> >>>>> >>>>> >>>> >>> >> >