you can simply do rdd.repartition(1).saveAsTextFile(...), it might not be
efficient if your output data is huge since one task will be doing the
whole writing.

Thanks
Best Regards

On Fri, Jun 5, 2015 at 3:46 PM, marcos rebelo <ole...@gmail.com> wrote:

> Hi all
>
> I'm running spark in a single local machine, no hadoop, just reading and
> writing in local disk.
>
> I need to have a single file as output of my calculation.
>
> if I do "rdd.saveAsTextFile(...)" all runs ok but I get allot of files.
> Since I need a single file I was considering to do something like:
>
>       Try {new FileWriter(outputPath)} match {
>         case Success(writer) =>
>           try {
>             rdd.toLocalIterator.foreach({line =>
>               val str = line.toString
>               writer.write(str)
>             }
>           }
>         }
>         ...
>       }
>
>
> I get:
>
> [error] o.a.s.e.Executor - Exception in task 0.0 in stage 41.0 (TID 32)
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45]
>     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> ~[na:1.8.0_45]
>     at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> ~[na:1.8.0_45]
>     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> ~[na:1.8.0_45]
>     at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> ~[na:1.8.0_45]
> [error] o.a.s.u.SparkUncaughtExceptionHandler - Uncaught exception in
> thread Thread[Executor task launch worker-1,5,main]
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45]
>     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> ~[na:1.8.0_45]
>     at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> ~[na:1.8.0_45]
>     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> ~[na:1.8.0_45]
>     at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> ~[na:1.8.0_45]
> [error] o.a.s.s.TaskSetManager - Task 0 in stage 41.0 failed 1 times;
> aborting job
> [warn] application - Can't write to /tmp/err1433498283479.csv: {}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 41.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 41.0 (TID 32, localhost): java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Arrays.java:3236)
>     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>     at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>     at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>     at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>     at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[scala-library-2.10.5.jar:na]
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> ~[scala-library-2.10.5.jar:na]
>
>
> if this rdd.toLocalIterator.foreach(...) doesn't work, what is the better
> solution?
>
> Best Regards
> Marcos
>
>
>

Reply via email to