Hello; 
I tried to adjust the number of blocks by repartitioning the input. 
Here is How I do it;  (I am partitioning by users )

tot = newrdd.map(lambda l: 
(l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
ratings = tot.values()
numIterations =8
rank = 80
model = ALS.trainImplicit(ratings, rank, numIterations)


I have 20 executors
with 5GM memory per executor. 
When i use 80 factors I keep getting the following problem :

Traceback (most recent call last):
  File "/homes/afarahat/myspark/mm/df4test.py", line 85, in <module>
    model = ALS.trainImplicit(ratings, rank, numIterations)
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
 line 201, in trainImplicit
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
 line 128, in callMLlibFunc
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
 line 121, in callJavaFunc
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
 line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
o113.trainImplicitALSModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in 
stage 36.1 failed 4 times, most recent failure: Lost task 7.3 in stage 36.1 
(TID 1841, gsbl52746.blue.ygrid.yahoo.com): java.io.FileNotFoundException: 
/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
 (No such file or directory)
        at java.io.RandomAccessFile.open(Native Method)
        at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
        at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
        at 
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Jun 28, 2015 2:10:37 AM INFO: parquet.hadoop.ParquetFileReader: Initiating 
action with parallelism: 5
~                                                                               
                                                                
On Jun 26, 2015, at 12:33 PM, Xiangrui Meng <men...@gmail.com> wrote:

> So you have 100 partitions (blocks). This might be too many for your dataset. 
> Try setting a smaller number of blocks, e.g., 32 or 64. When ALS starts 
> iterations, you can see the shuffle read/write size from the "stages" tab of 
> Spark WebUI. Vary number of blocks and check the numbers there. Kyro 
> serializer doesn't help much here. You can try disabling it (though I don't 
> think it caused the failure). -Xiangrui
> 
> On Fri, Jun 26, 2015 at 11:00 AM, Ayman Farahat <ayman.fara...@yahoo.com> 
> wrote:
> Hello ; 
> I checked on my partitions/storage and here is what I have 
> 
> I have 80 executors
> 5 G per executore. 
> 
> Do i need to set additional params
> say cores
> 
> spark.serializer                 org.apache.spark.serializer.KryoSerializer
> # spark.driver.memory              5g
> # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 
> -Dnumbers="one two three"
> spark.shuffle.memoryFraction  0.3
> spark.storage.memoryFraction  0.65
> 
> 
> 
> RDD Name      Storage Level   Cached Partitions       Fraction Cached Size in 
> Memory  Size in Tachyon Size on Disk
> ratingBlocks  Memory Deserialized 1x Replicated               257     129%    
> 4.1 GB  0.0 B   0.0 B
> itemOutBlocks Memory Deserialized 1x Replicated               100     100%    
> 7.3 MB  0.0 B   0.0 B
> 38    Memory Serialized 1x Replicated 193     97%     5.6 GB  0.0 B   0.0 B
> userInBlocks  Memory Deserialized 1x Replicated               100     100%    
> 2.8 GB  0.0 B   0.0 B
> itemFactors-1 Memory Deserialized 1x Replicated               69      69%     
> 8.4 MB  0.0 B   0.0 B
> itemInBlocks  Memory Deserialized 1x Replicated               69      69%     
> 1455.3 MB       0.0 B   0.0 B
> userFactors-1 Memory Deserialized 1x Replicated               100     100%    
> 35.0 GB 0.0 B   0.0 B
> userOutBlocks Memory Deserialized 1x Replicated               100     100%    
> 1062.7 MB       0.0 B   0.0 B
> 
> On Jun 26, 2015, at 8:26 AM, Xiangrui Meng <men...@gmail.com> wrote:
> 
>>  number of CPU cores or less.
> 
> 

Reply via email to