Hello Fabian, Thanks for the attention. Still haven't solved this. I did set up a cron job to clean the Docker images daily - thanks for that hint. As a last resort, I am going to look into a 2 TB NAS to see if this works.
What is confusing me is that this happens also for the com-orkut.ungraph.txt dataset which is much smaller than com-friendster.ungraph.txt but not that much bigger than the com-dblp.ungraph.txt. DBLP - I am able to run the DBLP on one TaskManager container. https://snap.stanford.edu/data/com-DBLP.html Nodes 317080 ~0.3 M Edges 1049866 ~ 1 M Orkut - no disk space error. https://snap.stanford.edu/data/com-Orkut.html Nodes 3072441 ~3 M Edges 117185083 ~ 117 M Friendster - no disk space error. https://snap.stanford.edu/data/com-Friendster.html Nodes 65608366 ~65 M Edges 1806067135 ~ 1800 M For testing purposes, I'm using a JobManager (in its own Docker container), a single TaskManager (in its own Docker container) with the following config parameters: Heap is currently configured to 6 GB: taskmanager.heap.mb: 6000 Parallelism is set as such: taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 It is my understanding that if I want to test for example N = 3 TaskManagers (each in its own Docker container) with minimum parallelism within each, I would use: taskmanager.numberOfTaskSlots: 1 parallelism.default: 3 Fabian, do you think you could help estimate how much disk space would be required to compute the Orkut data set for example? I am running a Flink 1.1.3 Docker cluster with a single TaskManager. This is the code I am using to read SNAP datasets and to test with Orkut, Friendster and DBLP, in case you have a minute to inspect it and see if something is amiss: public class App { public static void main(String[] args) { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final String dataPath = args[0]; final DataSet<Tuple2<Long, Long>> edgeTuples = env.readCsvFile(dataPath) .fieldDelimiter("\t") // node IDs are separated by spaces .ignoreComments("#") // comments start with "#" .types(Long.class, Long.class); // Dealing with an undirected graph, so we call .getUndirected() at the end. final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet( edgeTuples, new MapFunction<Long, Long>() { private static final long serialVersionUID = 8713516577419451509L; private long test = 1L; public Long map(Long value) { return value; } }, env ).getUndirected(); try { // Generate a unique ID value for each vertex. // Based on https://github.com/apache/flink/blob/master/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java DataSet<Tuple2<Long, Long>> idsWithInitialLabels = DataSetUtils.zipWithUniqueId(graph.getVertexIds()) .map( new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { private static final long serialVersionUID = -6348050104902440929L; @Override public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) throws Exception { return new Tuple2<Long, Long>(tuple2.f1, tuple2.f0); } } ); // Build the graph with initialization values. final Graph<Long, Long, NullValue> graphWithIDs = graph .joinWithVertices(idsWithInitialLabels, new VertexJoinFunction<Long, Long>() { private static final long serialVersionUID = -315275119763760820L; public Long vertexJoin(Long vertexValue, Long inputValue) { return inputValue; } }); // Execute LabelPropagation over it. DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new LabelPropagation<Long, Long, NullValue>(10)); graph.getVertices().print(); TimeUnit.SECONDS.sleep(2); System.out.println("graphWithIDs"); graphWithIDs.getVertices().print(); graphWithIDs.getEdges().print(); TimeUnit.SECONDS.sleep(2); // Group vertices by similar communities. final List<Vertex<Long, Long>> collected = result.collect(); final HashMap<Long, ArrayList<Long>> commSizes = new HashMap<Long, ArrayList<Long>>(); for(Vertex<Long, Long> v : collected) { //System.out.println("collected[v] = id:" + v.getId() + "\tval:" + v.getValue()); if(!commSizes.containsKey(v.getValue())) { commSizes.put(v.getValue(), new ArrayList<Long>()); } commSizes.get(v.getValue()).add(v.getId()); } System.out.println("#communities:\t" + commSizes.keySet().size() + "\n|result|:\t" + result.count() + "\n|collected|:\t" + collected.size()); } catch (Exception e) { e.printStackTrace(); } } } Thanks for your time, Miguel E. Coimbra Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> Skype: miguel.e.coimbra ---------- Forwarded message ---------- > From: Fabian Hueske < > > fhue...@gmail.com> > To: user@flink.apache.org > Cc: > Date: Mon, 5 Dec 2016 08:40:04 +0100 > Subject: > > Re: Thread 'SortMerger spilling thread' terminated due to an exception: No > space left on device > Hi Miguel, > > have you found a solution to your problem? > I'm not a docker expert but this forum thread looks like could be related > to your problem [1]. > > Best, > Fabian > > [1] https://forums.docker.com/t/no-space-left-on-device-error/10894 > > 2016-12-02 17:43 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: > >> Hello Fabian, >> >> I have created a directory on my host machine user directory ( >> /home/myuser/mydir ) and I am mapping it as a volume with Docker for the >> TaskManager and JobManager containers. >> Each container will thus have the following directory /home/flink/htmp >> >> host ---> container >> /home/myuser/mydir ---> /home/flink/htmp >> >> I had previously done this successfully with the a host directory which >> holds several SNAP data sets. >> In the Flink configuration file, I specified /home/flink/htmp to be used >> as the tmp dir for the TaskManager. >> This seems to be working, as I was able to start the cluster and invoke >> Flink for that Friendster dataset. >> >> However, during execution, there were 2 intermediate files which kept >> growing until they reached about 30 GB. >> At that point, the Flink TaskManager threw the exception again: >> >> java.lang.RuntimeException: Error obtaining the sorted input: Thread >> 'SortMerger spilling thread' terminated due to an exception: No space left >> on device >> >> Here is an ls excerpt of the directory on the host (to which the >> TaskManager container was also writing successfully) shortly before the >> exception: >> >> *31G *9d177a1971322263f1597c3378885ccf.channel >> *31G* a693811249bc5f785a79d1b1b537fe93.channel >> >> Now I believe the host system is capable of storing hundred GBs more, so >> I am confused as to what the problem might be. >> >> Best regards, >> >> Miguel E. Coimbra >> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >> Skype: miguel.e.coimbra >> >> >>> >>> Hi Miguel, >>> >>> the exception does indeed indicate that the process ran out of available >>> disk space. >>> The quoted paragraph of the blog post describes the situation when you >>> receive the IOE. >>> >>> By default the systems default tmp dir is used. I don't know which >>> folder that would be in a Docker setup. >>> You can configure the temp dir using the taskmanager.tmp.dirs config >>> key. >>> Please see the configuration documentation for details [1]. >>> >>> Hope this helps, >>> Fabian >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>> setup/config.html#jobmanager-amp-taskmanager >>> >>> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >>> >>> >>>> Hello, >>>> >>>> I have a problem for which I hope someone will be able to give a hint. >>>> I am running the Flink *standalone* cluster with 2 Docker containers >>>> (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM. >>>> >>>> The dataset is a large one: SNAP Friendster, which has around 1800 M >>>> edges. >>>> https://snap.stanford.edu/data/com-Friendster.html >>>> >>>> I am trying to run the Gelly built-in label propagation algorithm on >>>> top of it. >>>> As this is a very big dataset, I believe I am exceeding the available >>>> RAM and that the system is using secondary storage, which then fails: >>>> >>>> >>>> Connected to JobManager at Actor[akka.tcp://flink@172.19. >>>> 0.2:6123/user/jobmanager#894624508] >>>> 12/01/2016 17:58:24 Job execution switched to status RUNNING. >>>> 12/01/2016 17:58:24 DataSource (at main(App.java:33) ( >>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >>>> SCHEDULED >>>> 12/01/2016 17:58:24 DataSource (at main(App.java:33) ( >>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >>>> DEPLOYING >>>> 12/01/2016 17:58:24 DataSource (at main(App.java:33) ( >>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >>>> RUNNING >>>> 12/01/2016 17:58:24 Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) >>>> switched to SCHEDULED >>>> 12/01/2016 17:58:24 Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) >>>> switched to DEPLOYING >>>> 12/01/2016 17:58:24 Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) >>>> switched to RUNNING >>>> 12/01/2016 17:59:51 Map (Map at fromTuple2DataSet(Graph.java:343))(1/1) >>>> switched to FAILED >>>> *java.lang.RuntimeException: Error obtaining the sorted input: Thread >>>> 'SortMerger spilling thread' terminated due to an exception: No space left >>>> on device* >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> .getIterator(UnilateralSortMerger.java:619) >>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>> ask.java:1098) >>>> at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.j >>>> ava:86) >>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>> ava:486) >>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>> k.java:351) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) >>>> at java.lang.Thread.run(Thread.java:745) >>>> *Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>> terminated due to an exception: No space left on device* >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> $ThreadBase.run(UnilateralSortMerger.java:800) >>>> Caused by: java.io.IOException: No space left on device >>>> at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >>>> at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) >>>> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >>>> at sun.nio.ch.IOUtil.write(IOUtil.java:65) >>>> at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) >>>> at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteReque >>>> st.write(AsynchronousFileIOChannel.java:344) >>>> at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$Wr >>>> iterThread.run(IOManagerAsync.java:502) >>>> >>>> >>>> I do not have secondary storage limitations on the host system, so I >>>> believe the system would be able to handle whatever is spilled to the >>>> disk... >>>> Perhaps this is a Docker limitation regarding the usage of the host's >>>> secondary storage? >>>> >>>> Or is there perhaps some configuration or setting for the TaskManager >>>> which I am missing? >>>> Running the label propagation of Gelly on this dataset and cluster >>>> configuration, what would be the expected behavior if the system consumes >>>> all the memory? >>>> >>>> >>>> I believe the SortMerger thread is associated to the following >>>> mechanism described in this blog post: >>>> >>>> https://flink.apache.org/news/2015/03/13/peeking-into-Apache >>>> -Flinks-Engine-Room.html >>>> *The Sort-Merge-Join works by first sorting both input data sets on >>>> their join key attributes (Sort Phase) and merging the sorted data sets as >>>> a second step (Merge Phase). The sort is done in-memory if the local >>>> partition of a data set is small enough. Otherwise, an external merge-sort >>>> is done by collecting data until the working memory is filled, sorting it, >>>> writing the sorted data to the local filesystem, and starting over by >>>> filling the working memory again with more incoming data. After all input >>>> data has been received, sorted, and written as sorted runs to the local >>>> file system, a fully sorted stream can be obtained. This is done by reading >>>> the partially sorted runs from the local filesystem and sort-merging the >>>> records on the fly. Once the sorted streams of both inputs are available, >>>> both streams are sequentially read and merge-joined in a zig-zag fashion by >>>> comparing the sorted join key attributes, building join element pairs for >>>> matching keys, and advancing the sorted stream with the lower join key.* >>>> >>>> I am still investigating the possibility that Docker is at fault >>>> regarding secondary storage limitations, but how would I go about >>>> estimating the amount of disk space needed for this spilling on this >>>> dataset? >>>> >>>> Thanks for your time, >>>> >>>> My best regards, >>>> >>>> Miguel E. Coimbra >>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>> Skype: miguel.e.coimbra >>> >>> >> > >