Hi Stephan,
thank you for your explanation.
I thought I will be getting just 100MB of results after the Cross. This
is why I used it.
I will try something else then, most possibly a Map on the input.
Best,
Mihail
On 16.06.2015 04:27, Stephan Ewen wrote:
Cross is a quadratic operation. As such, it produces very large
results on moderate inputs, which can easily exceed memory and disk
space, if the subsequent operation requires to gather all data (such
as for the sort in your case).
If you use on both inputs 10 MB of 100 byte elements (100K elements
per input), you end up with 10 billion elements after the cross, which
is 1 TB in size (assuming the result elements are also 100 bytes).
This is an inherent issue of using a quadratic operation with data
that is to large to be handled by a quadratic operation. Not much
anyone can do about this.
Try and see if you can replace the Cross operation by something else
(Join, CoGroup) or whether you can at least filter aggressively after
the Cross before the next operation.
On Mon, Jun 15, 2015 at 2:18 PM, Mihail Vieru
<vi...@informatik.hu-berlin.de <mailto:vi...@informatik.hu-berlin.de>>
wrote:
Hi,
I get the following *"No space left on device" IOException* when
using the following Cross operator.
The inputs for the operator are each just *10MB* in size (same
input for IN1 and IN2; 1000 tuples) and I get the exception after
Flink manages to fill *50GB* of SSD space and the partition
becomes full.
I have found a similar problem in the mailing list here:
https://mail-archives.apache.org/mod_mbox/flink-user/201412.mbox/%3CCAN0XJzNiTyWDfcDLhsP6iJVhpUgnYn0ACy4ueS2R6YSB68Fr%3DA%40mail.gmail.com%3E
As I currently don't have any more free file system space left,
specifying other temporary folders for Flink is not an option.
Any ideas on what could help?
I'm using the latest 0.9-SNAPSHOT and run the job in a local
execution environment.
Best,
Mihail
/java.lang.Exception: The data preparation for task 'GroupReduce
(GroupReduce at main(APSPNaiveVernicaJob.java:100))' , caused an
error: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception: No space left on
device//
// at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)//
// at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)//
// at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)//
// at java.lang.Thread.run(Thread.java:745)//
//Caused by: java.lang.RuntimeException: Error obtaining the
sorted input: Thread 'SortMerger spilling thread' terminated due
to an exception: No space left on device//
// at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)//
// at
org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)//
// at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)//
// at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)//
// ... 3 more//
//Caused by: *java.io.IOException*: Thread 'SortMerger spilling
thread' terminated due to an exception: *No space left on device*//
// at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:785)//
//Caused by: java.io.IOException: No space left on device//
// at sun.nio.ch.FileDispatcherImpl.write0(Native Method)//
// at
sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)//
// at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)//
// at sun.nio.ch.IOUtil.write(IOUtil.java:65)//
// at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)//
// at
org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:340)//
// at
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:471)
// public static class crossKAPSPFilter implements
CrossFunction<Vertex<Integer, Tuple2<Integer[],String>>,
Vertex<Integer, Tuple2<Integer[],String>>, //
// Tuple2<Integer,String>> {//
//
// @Override//
// public Tuple2<Integer, String> cross(//
// Vertex<Integer, Tuple2<Integer[], String>>
vertex1,//
// Vertex<Integer, Tuple2<Integer[], String>>
vertex2) throws Exception {//
////
// int vertexIdFirst = vertex1.f0;//
// int vertexIdSecond = vertex2.f0;//
// Integer[] vertexDistanceVectorFirst =
vertex1.f1.f0;//
// Integer[] vertexDistanceVectorSecond =
vertex2.f1.f0;//
////
// if( // vertexIdFirst != vertexIdSecond//
// &&
vertexDistanceVectorFirst[vertexIdSecond] <= grapDistThreshold//
// &&
vertexDistanceVectorSecond[vertexIdFirst] <= grapDistThreshold //) {//
// return new Tuple2<Integer,
String>(vertex1.f0, vertex1.f1.f1);//
// }//
// else return null;//
// }//
// }/