Hi all I have to say that this solution surprise me. For the fist time I have such a requirement but I would expect another more elegant solution.
I'm sure that many persons are doing the some as I'm and they would love to have a better solution to such a problem. Best Regards Marcos On Fri, Jun 5, 2015 at 1:16 PM, ayan guha <guha.a...@gmail.com> wrote: > Another option is merge partfiles after your app ends. > On 5 Jun 2015 20:37, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: > >> you can simply do rdd.repartition(1).saveAsTextFile(...), it might not be >> efficient if your output data is huge since one task will be doing the >> whole writing. >> >> Thanks >> Best Regards >> >> On Fri, Jun 5, 2015 at 3:46 PM, marcos rebelo <ole...@gmail.com> wrote: >> >>> Hi all >>> >>> I'm running spark in a single local machine, no hadoop, just reading and >>> writing in local disk. >>> >>> I need to have a single file as output of my calculation. >>> >>> if I do "rdd.saveAsTextFile(...)" all runs ok but I get allot of files. >>> Since I need a single file I was considering to do something like: >>> >>> Try {new FileWriter(outputPath)} match { >>> case Success(writer) => >>> try { >>> rdd.toLocalIterator.foreach({line => >>> val str = line.toString >>> writer.write(str) >>> } >>> } >>> } >>> ... >>> } >>> >>> >>> I get: >>> >>> [error] o.a.s.e.Executor - Exception in task 0.0 in stage 41.0 (TID 32) >>> java.lang.OutOfMemoryError: Java heap space >>> at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45] >>> at >>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) >>> ~[na:1.8.0_45] >>> at >>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>> ~[na:1.8.0_45] >>> at >>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) >>> ~[na:1.8.0_45] >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >>> ~[na:1.8.0_45] >>> [error] o.a.s.u.SparkUncaughtExceptionHandler - Uncaught exception in >>> thread Thread[Executor task launch worker-1,5,main] >>> java.lang.OutOfMemoryError: Java heap space >>> at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45] >>> at >>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) >>> ~[na:1.8.0_45] >>> at >>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>> ~[na:1.8.0_45] >>> at >>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) >>> ~[na:1.8.0_45] >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >>> ~[na:1.8.0_45] >>> [error] o.a.s.s.TaskSetManager - Task 0 in stage 41.0 failed 1 times; >>> aborting job >>> [warn] application - Can't write to /tmp/err1433498283479.csv: {} >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 0 in stage 41.0 failed 1 times, most recent failure: Lost task 0.0 in stage >>> 41.0 (TID 32, localhost): java.lang.OutOfMemoryError: Java heap space >>> at java.util.Arrays.copyOf(Arrays.java:3236) >>> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) >>> at >>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>> at >>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at >>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) >>> at >>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Driver stacktrace: >>> at >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) >>> ~[spark-core_2.10-1.3.1.jar:1.3.1] >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) >>> ~[spark-core_2.10-1.3.1.jar:1.3.1] >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) >>> ~[spark-core_2.10-1.3.1.jar:1.3.1] >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> ~[scala-library-2.10.5.jar:na] >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> ~[scala-library-2.10.5.jar:na] >>> >>> >>> if this rdd.toLocalIterator.foreach(...) doesn't work, what is the >>> better solution? >>> >>> Best Regards >>> Marcos >>> >>> >>> >>