Hi Miguel,

estimating the space requirements is not trivial. It depends of course on
the algorithm and the data itself. I'm not an expert for graph algorithms
and don't know your datasets.

But have you tried to run the algorithm in a non dockerized environment?
That might help to figure out if this is an issue with the Docker
configuration rather than Flink.

Btw. If you want to run with a parallelism of 3 you need at least three
slots, either 3 three slots in one TM or 1 slot in each of three TMs.

Best,
Fabian

2016-12-05 17:20 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:

> Hello Fabian,
>
> Thanks for the attention. Still haven't solved this.
> I did set up a cron job to clean the Docker images daily - thanks for that
> hint.
> As a last resort, I am going to look into a 2 TB NAS to see if this works.
>
> What is confusing me is that this happens also for the
> com-orkut.ungraph.txt dataset which is much smaller than
> com-friendster.ungraph.txt but not that much bigger than the
> com-dblp.ungraph.txt.
>
> DBLP - ​I am able to run the DBLP on one TaskManager container.​
> https://snap.stanford.edu/data/com-DBLP.html
> Nodes 317080  ~0.3 M
> Edges 1049866 ~ 1 M
>
> Orkut - no disk space error.
> https://snap.stanford.edu/data/com-Orkut.html
> Nodes 3072441 ~3 M
> Edges 117185083 ~ 117 M
>
> ​Friendster - no disk space error.
> https://snap.stanford.edu/data/com-Friendster.html
> Nodes 65608366 ~65 M
> Edges 1806067135 ~ 1800 M​
>
> For testing purposes, I'm using a JobManager (in its own Docker
> container), a single TaskManager (in its own Docker container) with the
> following config parameters:
>
> Heap is currently configured to 6 GB:
> taskmanager.heap.mb: 6000
>
> Parallelism is set as such:
>
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1
>
> It is my understanding that if I want to test for example N = 3
> TaskManagers (each in its own Docker container) with minimum parallelism
> within each, I would use:
>
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 3
>
>
> Fabian, do you think you could help estimate how much disk space would be
> required to compute the Orkut data set for example?
> I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
> This is the code I am using to read SNAP datasets and to test with Orkut,
> Friendster and DBLP, in case you have a minute to inspect it and see if
> something is amiss:
>
> public class App {
>     public static void main(String[] args) {
>         final ExecutionEnvironment env = ExecutionEnvironment.
> getExecutionEnvironment();
>         final String dataPath = args[0];
>
>         final DataSet<Tuple2<Long, Long>> edgeTuples =
> env.readCsvFile(dataPath)
>             .fieldDelimiter("\t") // node IDs are separated by spaces
>             .ignoreComments("#")  // comments start with "#"
>             .types(Long.class, Long.class);
>
>         // Dealing with an undirected graph, so we call .getUndirected()
> at the end.
>         final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet(
>             edgeTuples,
>             new MapFunction<Long, Long>() {
>                 private static final long serialVersionUID =
> 8713516577419451509L;
>                 private long test = 1L;
>                 public Long map(Long value) {
>                     return value;
>                 }
>             },
>             env
>         ).getUndirected();
>
>
>         try {
>             // Generate a unique ID value for each vertex.
>             // Based on https://github.com/apache/flink/blob/master/flink-
> libraries/flink-gelly-examples/src/main/java/org/
> apache/flink/graph/examples/MusicProfiles.java
>             DataSet<Tuple2<Long, Long>> idsWithInitialLabels =
> DataSetUtils.zipWithUniqueId(graph.getVertexIds())
>                 .map(
>                     new MapFunction<Tuple2<Long, Long>, Tuple2<Long,
> Long>>() {
>                         private static final long serialVersionUID =
> -6348050104902440929L;
>
>                         @Override
>                         public Tuple2<Long, Long> map(Tuple2<Long, Long>
> tuple2) throws Exception {
>                             return new Tuple2<Long, Long>(tuple2.f1,
> tuple2.f0);
>                         }
>                     }
>                 );
>
>             // Build the graph with initialization values.
>             final Graph<Long, Long, NullValue> graphWithIDs = graph
>                 .joinWithVertices(idsWithInitialLabels,
>                     new VertexJoinFunction<Long, Long>() {
>                         private static final long serialVersionUID =
> -315275119763760820L;
>                         public Long vertexJoin(Long vertexValue, Long
> inputValue) {
>                             return inputValue;
>                         }
>                 });
>
>             // Execute LabelPropagation over it.
>             DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new
> LabelPropagation<Long, Long, NullValue>(10));
>
>             graph.getVertices().print();
>
>             TimeUnit.SECONDS.sleep(2);
>
>             System.out.println("graphWithIDs");
>             graphWithIDs.getVertices().print();
>             graphWithIDs.getEdges().print();
>
>             TimeUnit.SECONDS.sleep(2);
>
>             // Group vertices by similar communities.
>             final List<Vertex<Long, Long>> collected = result.collect();
>             final HashMap<Long, ArrayList<Long>> commSizes = new
> HashMap<Long, ArrayList<Long>>();
>             for(Vertex<Long, Long> v : collected) {
>                 //System.out.println("collected[v] = id:" + v.getId() +
> "\tval:" + v.getValue());
>                 if(!commSizes.containsKey(v.getValue())) {
>                     commSizes.put(v.getValue(), new ArrayList<Long>());
>                 }
>                 commSizes.get(v.getValue()).add(v.getId());
>             }
>
>
>             System.out.println("#communities:\t" +
> commSizes.keySet().size() + "\n|result|:\t" + result.count() +
> "\n|collected|:\t" + collected.size());
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
> }
>
> ​Thanks for your time,​
>
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
> Skype: miguel.e.coimbra
>
> ---------- Forwarded message ----------
>
>> From: Fabian Hueske <
>> ​​
>> fhue...@gmail.com>
>> To: user@flink.apache.org
>> Cc:
>> Date: Mon, 5 Dec 2016 08:40:04 +0100
>> Subject:
>> ​​
>> Re: Thread 'SortMerger spilling thread' terminated due to an exception:
>> No space left on device
>>
>> Hi Miguel,
>>
>> have you found a solution to your problem?
>> I'm not a docker expert but this forum thread looks like could be related
>> to your problem [1].
>>
>> Best,
>> Fabian
>>
>> [1] https://forums.docker.com/t/no-space-left-on-device-error/10894
>>
>> 2016-12-02 17:43 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>
>>> Hello Fabian,
>>>
>>> I have created a directory on my host machine user directory (
>>> /home/myuser/mydir ) and I am mapping it as a volume with Docker for the
>>> TaskManager and JobManager containers.
>>> Each container will thus have the following directory /home/flink/htmp
>>>
>>> host ---> container
>>> /home/myuser/mydir ---> /home/flink/htmp
>>>
>>> I had previously done this successfully with the a host directory which
>>> holds several SNAP data sets.
>>> In the Flink configuration file, I specified /home/flink/htmp to be used
>>> as the tmp dir for the TaskManager.
>>> This seems to be working, as I was able to start the cluster and invoke
>>> Flink for that Friendster dataset.
>>>
>>> However, during execution, there were 2 intermediate files which kept
>>> growing until they reached about 30 GB.
>>> At that point, the Flink TaskManager threw the exception again:
>>>
>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>> 'SortMerger spilling thread' terminated due to an exception: No space left
>>> on device
>>>
>>> Here is an ls excerpt of the directory on the host (to which the
>>> TaskManager container was also writing successfully) shortly before the
>>> exception:
>>>
>>> *31G *9d177a1971322263f1597c3378885ccf.channel
>>> *31G* a693811249bc5f785a79d1b1b537fe93.channel
>>>
>>> Now I believe the host system is capable of storing hundred GBs more, so
>>> I am confused as to what the problem might be.
>>>
>>> Best regards,
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>> Skype: miguel.e.coimbra
>>>
>>> ​
>>>>
>>>> Hi Miguel,
>>>>
>>>> the exception does indeed indicate that the process ran out of
>>>> available disk space.
>>>> The quoted paragraph of the blog post describes the situation when you
>>>> receive the IOE.
>>>>
>>>> By default the systems default tmp dir is used. I don't know which
>>>> folder that would be in a Docker setup.
>>>> You can configure the temp dir using the taskmanager.tmp.dirs config
>>>> key.
>>>> Please see the configuration documentation for details [1].
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>> setup/config.html#jobmanager-amp-taskmanager
>>>>
>>>> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>>> ​
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a problem for which I hope someone will be able to give a hint.
>>>>> I am running the Flink *standalone* cluster with 2 Docker containers
>>>>> (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.
>>>>>
>>>>> The dataset is a large one: SNAP Friendster, which has around 1800 M
>>>>> edges.
>>>>> https://snap.stanford.edu/data/com-Friendster.html
>>>>>
>>>>> I am trying to run the Gelly built-in label propagation algorithm on
>>>>> top of it.
>>>>> As this is a very big dataset, I believe I am exceeding the available
>>>>> RAM and that the system is using secondary storage, which then fails:
>>>>>
>>>>>
>>>>> Connected to JobManager at Actor[akka.tcp://flink@172.19.
>>>>> 0.2:6123/user/jobmanager#894624508]
>>>>> 12/01/2016 17:58:24    Job execution switched to status RUNNING.
>>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>>> SCHEDULED
>>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>>> DEPLOYING
>>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>>> RUNNING
>>>>> 12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>>>>> switched to SCHEDULED
>>>>> 12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>>>>> switched to DEPLOYING
>>>>> 12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>>>>> switched to RUNNING
>>>>> 12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>>>>> switched to FAILED
>>>>> *java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>>> 'SortMerger spilling thread' terminated due to an exception: No space left
>>>>> on device*
>>>>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>     at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>> ask.java:1098)
>>>>>     at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.j
>>>>> ava:86)
>>>>>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:486)
>>>>>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>> k.java:351)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>> *Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>>> terminated due to an exception: No space left on device*
>>>>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:800)
>>>>> Caused by: java.io.IOException: No space left on device
>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
>>>>>     at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteReque
>>>>> st.write(AsynchronousFileIOChannel.java:344)
>>>>>     at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$Wr
>>>>> iterThread.run(IOManagerAsync.java:502)
>>>>>
>>>>>
>>>>> I do not have secondary storage limitations on the host system, so I
>>>>> believe the system would be able to handle whatever is spilled to the
>>>>> disk...
>>>>> Perhaps this is a Docker limitation regarding the usage of the host's
>>>>> secondary storage?
>>>>>
>>>>> Or is there perhaps some configuration or setting for the TaskManager
>>>>> which I am missing?
>>>>> Running the label propagation of Gelly on this dataset and cluster
>>>>> configuration, what would be the expected behavior if the system consumes
>>>>> all the memory?
>>>>>
>>>>>
>>>>> I believe the SortMerger thread is associated to the following
>>>>> mechanism described in this blog post:
>>>>>
>>>>> https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>>>> -Flinks-Engine-Room.html
>>>>> *The Sort-Merge-Join works by first sorting both input data sets on
>>>>> their join key attributes (Sort Phase) and merging the sorted data sets as
>>>>> a second step (Merge Phase). The sort is done in-memory if the local
>>>>> partition of a data set is small enough. Otherwise, an external merge-sort
>>>>> is done by collecting data until the working memory is filled, sorting it,
>>>>> writing the sorted data to the local filesystem, and starting over by
>>>>> filling the working memory again with more incoming data. After all input
>>>>> data has been received, sorted, and written as sorted runs to the local
>>>>> file system, a fully sorted stream can be obtained. This is done by 
>>>>> reading
>>>>> the partially sorted runs from the local filesystem and sort-merging the
>>>>> records on the fly. Once the sorted streams of both inputs are available,
>>>>> both streams are sequentially read and merge-joined in a zig-zag fashion 
>>>>> by
>>>>> comparing the sorted join key attributes, building join element pairs for
>>>>> matching keys, and advancing the sorted stream with the lower join key.*
>>>>>
>>>>> I am still investigating the possibility that Docker is at fault
>>>>> regarding secondary storage limitations, but how would I go about
>>>>> estimating the amount of disk space needed for this spilling on this
>>>>> dataset?
>>>>>
>>>>> Thanks for your time,
>>>>>
>>>>> My best regards,
>>>>>
>>>>> Miguel E. Coimbra
>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>> Skype: miguel.e.coimbra
>>>>
>>>>
>>>
>>
>>
>

Reply via email to