So.. same result with parallelize (matrix,1000)
with broadcast.. seems like I got jvm core dump :-/
4/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager
host:47978 with 19.2 GB RAM
14/09/15 02:31:22 INFO BlockManagerInfo: Registering block manager
host:43360 with 19.2 GB RAM
Unhandled exception
Unhandled exception
Type=Segmentation error vmState=0x00000000
J9Generic_Signal_Number=00000004 Signal_Number=0000000b
Error_Value=00000000 Signal_Code=00000001
Handler1=00002AAAABF53760 Handler2=00002AAAAC3069D0
InaccessibleAddress=0000000000000000
RDI=00002AB9505F2698 RSI=00002AABAE2C54D8 RAX=00002AB7CE6009A0
RBX=00002AB7CE6009C0
RCX=00000000FFC7FFE0 RDX=00002AB8509726A8 R8=000000007FE41FF0
R9=0000000000002000
R10=00002AAAADA318A0 R11=00002AB850959520 R12=00002AB5EF97DD88
R13=00002AB5EF97BD88
R14=00002AAAAC0CE940 R15=00002AB5EF97BD88
RIP=0000000000000000 GS=0000 FS=0000 RSP=00000000007367A0
EFlags=0000000000210282 CS=0033 RBP=0000000000BCDB00 ERR=0000000000000014
TRAPNO=000000000000000E OLDMASK=0000000000000000 CR2=0000000000000000
xmm0 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
xmm1 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
xmm2 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
xmm3 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
xmm4 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
xmm5 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
xmm6 4141414141414141 (f: 1094795648.000000, d: 2.261635e+06)
xmm7 f180c714f8e2a139 (f: 4175601920.000000, d: -5.462583e+238)
xmm8 00000000428e8000 (f: 1116635136.000000, d: 5.516911e-315)
xmm9 0000000000000000 (f: 0.000000, d: 0.000000e+00)
xmm10 0000000000000000 (f: 0.000000, d: 0.000000e+00)
xmm11 0000000000000000 (f: 0.000000, d: 0.000000e+00)
xmm12 0000000000000000 (f: 0.000000, d: 0.000000e+00)
xmm13 0000000000000000 (f: 0.000000, d: 0.000000e+00)
xmm14 0000000000000000 (f: 0.000000, d: 0.000000e+00)
xmm15 0000000000000000 (f: 0.000000, d: 0.000000e+00)
Target=2_60_20140106_181350 (Linux 3.0.93-0.8.2_1.0502.8048-cray_ari_c)
CPU=amd64 (48 logical CPUs) (0xfc0c5b000 RAM)
----------- Stack Backtrace -----------
(0x00002AAAAC2FA122 [libj9prt26.so+0x13122])
(0x00002AAAAC30779F [libj9prt26.so+0x2079f])
(0x00002AAAAC2F9E6B [libj9prt26.so+0x12e6b])
(0x00002AAAAC2F9F67 [libj9prt26.so+0x12f67])
(0x00002AAAAC30779F [libj9prt26.so+0x2079f])
(0x00002AAAAC2F9A8B [libj9prt26.so+0x12a8b])
(0x00002AAAABF52C9D [libj9vm26.so+0x1ac9d])
(0x00002AAAAC30779F [libj9prt26.so+0x2079f])
(0x00002AAAABF52F56 [libj9vm26.so+0x1af56])
(0x00002AAAABF96CA0 [libj9vm26.so+0x5eca0])
---------------------------------------
JVMDUMP039I
JVMDUMP032I


Note, this still is with the framesize I modified in the last email thread?

On Mon, Sep 15, 2014 at 2:12 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Try:
>
> rdd = sc.broadcast(matrix)
>
> Or
>
> rdd = sc.parallelize(matrix,100) // Just increase the number of slices,
> give it a try.
>
>
>
> Thanks
> Best Regards
>
> On Mon, Sep 15, 2014 at 2:18 PM, Chengi Liu <chengi.liu...@gmail.com>
> wrote:
>
>> Hi Akhil,
>>   So with your config (specifically with set("spark.akka.frameSize ",
>> "10000000")) , I see the error:
>> org.apache.spark.SparkException: Job aborted due to stage failure:
>> Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize
>> (10485760 bytes). Consider using broadcast variables for large values.
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>> at org.apache.spark
>>
>> So, I changed
>> set("spark.akka.frameSize ", "10000000") to set("spark.akka.frameSize ",
>> "10000000*00*")
>> but now I get the same error?
>>
>> y4j.protocol.Py4JJavaError: An error occurred while calling
>> o28.trainKMeansModel.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: All
>> masters are unresponsive! Giving up.
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched
>>
>>
>> along with following:
>> 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted
>> any resources; check your cluster UI to ensure that workers are registered
>> and have sufficient memory
>> 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master
>> spark://host:7077...
>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
>> akka.tcp://sparkMaster@host:7077:
>> akka.remote.EndpointAssociationException: Association failed with
>> [akka.tcp://sparkMaster@host:7077]
>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
>> akka.tcp://sparkMaster@host:7077:
>> akka.remote.EndpointAssociationException: Association failed with
>> [akka.tcp://sparkMaster@host:7077]
>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
>> akka.tcp://sparkMaster@host:7077:
>> akka.remote.EndpointAssociationException: Association failed with
>> [akka.tcp://sparkMaster@host:7077]
>> 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to
>> akka.tcp://sparkMaster@host:7077:
>> akka.remote.EndpointAssociationException: Association failed with
>> [akka.tcp://sparkMaster@host:7077]
>> 14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted
>> any resources; check your cluster UI to ensure that workers are registered
>> and have sufficient memory
>> 14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted
>> any resources; check your cluster UI to ensure that workers are registered
>> and have sufficient memory
>> 14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been
>> killed. Reason: All masters are unresponsive! Giving up.
>>
>>
>> :-(
>>
>> On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Can you give this a try:
>>>
>>> conf = SparkConf().set("spark.executor.memory", 
>>> "32G")*.set("spark.akka.frameSize
>>>> ",
>>>> "10000000").set("spark.broadcast.factory","org.apache.spark.broadcast.TorrentBroadcastFactory")*
>>>> sc = SparkContext(conf = conf)
>>>> rdd = sc.parallelize(matrix,5)
>>>> from pyspark.mllib.clustering import KMeans
>>>> from math import sqrt
>>>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
>>>> initializationMode="random")
>>>> def error(point):
>>>>     center = clusters.centers[clusters.predict(point)]
>>>>     return sqrt(sum([x**2 for x in (point - center)]))
>>>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
>>>> print "Within Set Sum of Squared Error = " + str(WSSSE)
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu <chengi.liu...@gmail.com>
>>> wrote:
>>>
>>>> And the thing is code runs just fine if I reduce the number of rows in
>>>> my data?
>>>>
>>>> On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu <chengi.liu...@gmail.com>
>>>> wrote:
>>>>
>>>>> I am using spark1.0.2.
>>>>> This is my work cluster.. so I can't setup a new version readily...
>>>>> But right now, I am not using broadcast ..
>>>>>
>>>>>
>>>>> conf = SparkConf().set("spark.executor.memory",
>>>>> "32G").set("spark.akka.frameSize", "1000")
>>>>> sc = SparkContext(conf = conf)
>>>>> rdd = sc.parallelize(matrix,5)
>>>>>
>>>>> from pyspark.mllib.clustering import KMeans
>>>>> from math import sqrt
>>>>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
>>>>> initializationMode="random")
>>>>> def error(point):
>>>>>     center = clusters.centers[clusters.predict(point)]
>>>>>     return sqrt(sum([x**2 for x in (point - center)]))
>>>>>
>>>>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y)
>>>>> print "Within Set Sum of Squared Error = " + str(WSSSE)
>>>>>
>>>>>
>>>>> executed by
>>>>> spark-submit --master $SPARKURL clustering_example.py
>>>>>  --executor-memory 32G  --driver-memory 60G
>>>>>
>>>>> and the error I see
>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>>> o26.trainKMeansModel.
>>>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> All masters are unresponsive! Giving up.
>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>> <http://org.apache.spark.scheduler.dagscheduler.org/>
>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>>>>> at
>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>>>> at scala.Option.foreach(Option.scala:236)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
>>>>> at
>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>
>>>>>
>>>>> and
>>>>> 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to
>>>>> akka.tcp://sparkMaster@hostname:7077:
>>>>> akka.remote.EndpointAssociationException: Association failed with
>>>>> [akka.tcp://sparkMaster@ hostname:7077]
>>>>>
>>>>> ??
>>>>> Any suggestions??
>>>>>
>>>>>
>>>>> On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu <dav...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Chengi,
>>>>>>
>>>>>> What's the version of Spark you are using? It have big improvements
>>>>>> about broadcast in 1.1, could you try it?
>>>>>>
>>>>>> On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu <chengi.liu...@gmail.com>
>>>>>> wrote:
>>>>>> > Any suggestions.. I am really blocked on this one
>>>>>> >
>>>>>> > On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu <
>>>>>> chengi.liu...@gmail.com> wrote:
>>>>>> >>
>>>>>> >> And when I use sparksubmit script, I get the following error:
>>>>>> >>
>>>>>> >> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>>>> >> o26.trainKMeansModel.
>>>>>> >> : org.apache.spark.SparkException: Job aborted due to stage
>>>>>> failure: All
>>>>>> >> masters are unresponsive! Giving up.
>>>>>> >> at
>>>>>> >> org.apache.spark.scheduler.DAGScheduler.org
>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>>>>>> >> at
>>>>>> >>
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>> >> at
>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>>>>>> >> at scala.Option.foreach(Option.scala:236)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
>>>>>> >> at
>>>>>> >>
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
>>>>>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>>>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>>>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>>>> >> at
>>>>>> >>
>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>>> >> at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>> >> at
>>>>>> >>
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>> >> at
>>>>>> >>
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>> >> at
>>>>>> >>
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> >>
>>>>>> >>
>>>>>> >> My spark submit code is
>>>>>> >>
>>>>>> >> conf = SparkConf().set("spark.executor.memory",
>>>>>> >> "32G").set("spark.akka.frameSize", "1000")
>>>>>> >> sc = SparkContext(conf = conf)
>>>>>> >> rdd = sc.parallelize(matrix,5)
>>>>>> >>
>>>>>> >> from pyspark.mllib.clustering import KMeans
>>>>>> >> from math import sqrt
>>>>>> >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2,
>>>>>> >> initializationMode="random")
>>>>>> >> def error(point):
>>>>>> >>     center = clusters.centers[clusters.predict(point)]
>>>>>> >>     return sqrt(sum([x**2 for x in (point - center)]))
>>>>>> >>
>>>>>> >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x
>>>>>> + y)
>>>>>> >> print "Within Set Sum of Squared Error = " + str(WSSSE)
>>>>>> >>
>>>>>> >> Which is executed as following:
>>>>>> >> spark-submit --master $SPARKURL clustering_example.py
>>>>>> --executor-memory
>>>>>> >> 32G  --driver-memory 60G
>>>>>> >>
>>>>>> >> On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu <
>>>>>> chengi.liu...@gmail.com>
>>>>>> >> wrote:
>>>>>> >>>
>>>>>> >>> How? Example please..
>>>>>> >>> Also, if I am running this in pyspark shell.. how do i configure
>>>>>> >>> spark.akka.frameSize ??
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das <
>>>>>> ak...@sigmoidanalytics.com>
>>>>>> >>> wrote:
>>>>>> >>>>
>>>>>> >>>> When the data size is huge, you better of use the
>>>>>> >>>> torrentBroadcastFactory.
>>>>>> >>>>
>>>>>> >>>> Thanks
>>>>>> >>>> Best Regards
>>>>>> >>>>
>>>>>> >>>> On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu <
>>>>>> chengi.liu...@gmail.com>
>>>>>> >>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>> Specifically the error I see when I try to operate on rdd
>>>>>> created by
>>>>>> >>>>> sc.parallelize method
>>>>>> >>>>> : org.apache.spark.SparkException: Job aborted due to stage
>>>>>> failure:
>>>>>> >>>>> Serialized task 12:12 was 12062263 bytes which exceeds
>>>>>> spark.akka.frameSize
>>>>>> >>>>> (10485760 bytes). Consider using broadcast variables for large
>>>>>> values.
>>>>>> >>>>>
>>>>>> >>>>> On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu <
>>>>>> chengi.liu...@gmail.com>
>>>>>> >>>>> wrote:
>>>>>> >>>>>>
>>>>>> >>>>>> Hi,
>>>>>> >>>>>>    I am trying to create an rdd out of large matrix....
>>>>>> sc.parallelize
>>>>>> >>>>>> suggest to use broadcast
>>>>>> >>>>>> But when I do
>>>>>> >>>>>>
>>>>>> >>>>>> sc.broadcast(data)
>>>>>> >>>>>> I get this error:
>>>>>> >>>>>>
>>>>>> >>>>>> Traceback (most recent call last):
>>>>>> >>>>>>   File "<stdin>", line 1, in <module>
>>>>>> >>>>>>   File
>>>>>> "/usr/common/usg/spark/1.0.2/python/pyspark/context.py", line
>>>>>> >>>>>> 370, in broadcast
>>>>>> >>>>>>     pickled = pickleSer.dumps(value)
>>>>>> >>>>>>   File
>>>>>> "/usr/common/usg/spark/1.0.2/python/pyspark/serializers.py",
>>>>>> >>>>>> line 279, in dumps
>>>>>> >>>>>>     def dumps(self, obj): return cPickle.dumps(obj, 2)
>>>>>> >>>>>> SystemError: error return without exception set
>>>>>> >>>>>> Help?
>>>>>> >>>>>>
>>>>>> >>>>>
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to