Hello Fabian,

Thanks for the attention. Still haven't solved this.
I did set up a cron job to clean the Docker images daily - thanks for that
hint.
As a last resort, I am going to look into a 2 TB NAS to see if this works.

What is confusing me is that this happens also for the com-orkut.ungraph.txt
dataset which is much smaller than com-friendster.ungraph.txt but not that
much bigger than the com-dblp.ungraph.txt.

DBLP - ​I am able to run the DBLP on one TaskManager container.​
https://snap.stanford.edu/data/com-DBLP.html
Nodes 317080  ~0.3 M
Edges 1049866 ~ 1 M

Orkut - no disk space error.
https://snap.stanford.edu/data/com-Orkut.html
Nodes 3072441 ~3 M
Edges 117185083 ~ 117 M

​Friendster - no disk space error.
https://snap.stanford.edu/data/com-Friendster.html
Nodes 65608366 ~65 M
Edges 1806067135 ~ 1800 M​

For testing purposes, I'm using a JobManager (in its own Docker container),
a single TaskManager (in its own Docker container) with the following
config parameters:

Heap is currently configured to 6 GB:
taskmanager.heap.mb: 6000

Parallelism is set as such:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 1

It is my understanding that if I want to test for example N = 3
TaskManagers (each in its own Docker container) with minimum parallelism
within each, I would use:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 3


Fabian, do you think you could help estimate how much disk space would be
required to compute the Orkut data set for example?
I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
This is the code I am using to read SNAP datasets and to test with Orkut,
Friendster and DBLP, in case you have a minute to inspect it and see if
something is amiss:

public class App {
    public static void main(String[] args) {
        final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
        final String dataPath = args[0];

        final DataSet<Tuple2<Long, Long>> edgeTuples =
env.readCsvFile(dataPath)
            .fieldDelimiter("\t") // node IDs are separated by spaces
            .ignoreComments("#")  // comments start with "#"
            .types(Long.class, Long.class);

        // Dealing with an undirected graph, so we call .getUndirected() at
the end.
        final Graph<Long, Long, NullValue> graph = Graph.fromTuple2DataSet(
            edgeTuples,
            new MapFunction<Long, Long>() {
                private static final long serialVersionUID =
8713516577419451509L;
                private long test = 1L;
                public Long map(Long value) {
                    return value;
                }
            },
            env
        ).getUndirected();


        try {
            // Generate a unique ID value for each vertex.
            // Based on
https://github.com/apache/flink/blob/master/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
            DataSet<Tuple2<Long, Long>> idsWithInitialLabels =
DataSetUtils.zipWithUniqueId(graph.getVertexIds())
                .map(
                    new MapFunction<Tuple2<Long, Long>, Tuple2<Long,
Long>>() {
                        private static final long serialVersionUID =
-6348050104902440929L;

                        @Override
                        public Tuple2<Long, Long> map(Tuple2<Long, Long>
tuple2) throws Exception {
                            return new Tuple2<Long, Long>(tuple2.f1,
tuple2.f0);
                        }
                    }
                );

            // Build the graph with initialization values.
            final Graph<Long, Long, NullValue> graphWithIDs = graph
                .joinWithVertices(idsWithInitialLabels,
                    new VertexJoinFunction<Long, Long>() {
                        private static final long serialVersionUID =
-315275119763760820L;
                        public Long vertexJoin(Long vertexValue, Long
inputValue) {
                            return inputValue;
                        }
                });

            // Execute LabelPropagation over it.
            DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new
LabelPropagation<Long, Long, NullValue>(10));

            graph.getVertices().print();

            TimeUnit.SECONDS.sleep(2);

            System.out.println("graphWithIDs");
            graphWithIDs.getVertices().print();
            graphWithIDs.getEdges().print();

            TimeUnit.SECONDS.sleep(2);

            // Group vertices by similar communities.
            final List<Vertex<Long, Long>> collected = result.collect();
            final HashMap<Long, ArrayList<Long>> commSizes = new
HashMap<Long, ArrayList<Long>>();
            for(Vertex<Long, Long> v : collected) {
                //System.out.println("collected[v] = id:" + v.getId() +
"\tval:" + v.getValue());
                if(!commSizes.containsKey(v.getValue())) {
                    commSizes.put(v.getValue(), new ArrayList<Long>());
                }
                commSizes.get(v.getValue()).add(v.getId());
            }


            System.out.println("#communities:\t" +
commSizes.keySet().size() + "\n|result|:\t" + result.count() +
"\n|collected|:\t" + collected.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

​Thanks for your time,​


Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

---------- Forwarded message ----------

> From: Fabian Hueske <
> ​​
> fhue...@gmail.com>
> To: user@flink.apache.org
> Cc:
> Date: Mon, 5 Dec 2016 08:40:04 +0100
> Subject:
> ​​
> Re: Thread 'SortMerger spilling thread' terminated due to an exception: No
> space left on device
> Hi Miguel,
>
> have you found a solution to your problem?
> I'm not a docker expert but this forum thread looks like could be related
> to your problem [1].
>
> Best,
> Fabian
>
> [1] https://forums.docker.com/t/no-space-left-on-device-error/10894
>
> 2016-12-02 17:43 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Hello Fabian,
>>
>> I have created a directory on my host machine user directory (
>> /home/myuser/mydir ) and I am mapping it as a volume with Docker for the
>> TaskManager and JobManager containers.
>> Each container will thus have the following directory /home/flink/htmp
>>
>> host ---> container
>> /home/myuser/mydir ---> /home/flink/htmp
>>
>> I had previously done this successfully with the a host directory which
>> holds several SNAP data sets.
>> In the Flink configuration file, I specified /home/flink/htmp to be used
>> as the tmp dir for the TaskManager.
>> This seems to be working, as I was able to start the cluster and invoke
>> Flink for that Friendster dataset.
>>
>> However, during execution, there were 2 intermediate files which kept
>> growing until they reached about 30 GB.
>> At that point, the Flink TaskManager threw the exception again:
>>
>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>> 'SortMerger spilling thread' terminated due to an exception: No space left
>> on device
>>
>> Here is an ls excerpt of the directory on the host (to which the
>> TaskManager container was also writing successfully) shortly before the
>> exception:
>>
>> *31G *9d177a1971322263f1597c3378885ccf.channel
>> *31G* a693811249bc5f785a79d1b1b537fe93.channel
>>
>> Now I believe the host system is capable of storing hundred GBs more, so
>> I am confused as to what the problem might be.
>>
>> Best regards,
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>> Skype: miguel.e.coimbra
>>
>> ​
>>>
>>> Hi Miguel,
>>>
>>> the exception does indeed indicate that the process ran out of available
>>> disk space.
>>> The quoted paragraph of the blog post describes the situation when you
>>> receive the IOE.
>>>
>>> By default the systems default tmp dir is used. I don't know which
>>> folder that would be in a Docker setup.
>>> You can configure the temp dir using the taskmanager.tmp.dirs config
>>> key.
>>> Please see the configuration documentation for details [1].
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>> setup/config.html#jobmanager-amp-taskmanager
>>>
>>> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>> ​
>>>
>>>> Hello,
>>>>
>>>> I have a problem for which I hope someone will be able to give a hint.
>>>> I am running the Flink *standalone* cluster with 2 Docker containers
>>>> (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.
>>>>
>>>> The dataset is a large one: SNAP Friendster, which has around 1800 M
>>>> edges.
>>>> https://snap.stanford.edu/data/com-Friendster.html
>>>>
>>>> I am trying to run the Gelly built-in label propagation algorithm on
>>>> top of it.
>>>> As this is a very big dataset, I believe I am exceeding the available
>>>> RAM and that the system is using secondary storage, which then fails:
>>>>
>>>>
>>>> Connected to JobManager at Actor[akka.tcp://flink@172.19.
>>>> 0.2:6123/user/jobmanager#894624508]
>>>> 12/01/2016 17:58:24    Job execution switched to status RUNNING.
>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>> SCHEDULED
>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>> DEPLOYING
>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>> RUNNING
>>>> 12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>>>> switched to SCHEDULED
>>>> 12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>>>> switched to DEPLOYING
>>>> 12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>>>> switched to RUNNING
>>>> 12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>>>> switched to FAILED
>>>> *java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>> 'SortMerger spilling thread' terminated due to an exception: No space left
>>>> on device*
>>>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>     at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>> ask.java:1098)
>>>>     at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.j
>>>> ava:86)
>>>>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>> ava:486)
>>>>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>> k.java:351)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>> *Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>> terminated due to an exception: No space left on device*
>>>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:800)
>>>> Caused by: java.io.IOException: No space left on device
>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
>>>>     at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteReque
>>>> st.write(AsynchronousFileIOChannel.java:344)
>>>>     at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$Wr
>>>> iterThread.run(IOManagerAsync.java:502)
>>>>
>>>>
>>>> I do not have secondary storage limitations on the host system, so I
>>>> believe the system would be able to handle whatever is spilled to the
>>>> disk...
>>>> Perhaps this is a Docker limitation regarding the usage of the host's
>>>> secondary storage?
>>>>
>>>> Or is there perhaps some configuration or setting for the TaskManager
>>>> which I am missing?
>>>> Running the label propagation of Gelly on this dataset and cluster
>>>> configuration, what would be the expected behavior if the system consumes
>>>> all the memory?
>>>>
>>>>
>>>> I believe the SortMerger thread is associated to the following
>>>> mechanism described in this blog post:
>>>>
>>>> https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>>> -Flinks-Engine-Room.html
>>>> *The Sort-Merge-Join works by first sorting both input data sets on
>>>> their join key attributes (Sort Phase) and merging the sorted data sets as
>>>> a second step (Merge Phase). The sort is done in-memory if the local
>>>> partition of a data set is small enough. Otherwise, an external merge-sort
>>>> is done by collecting data until the working memory is filled, sorting it,
>>>> writing the sorted data to the local filesystem, and starting over by
>>>> filling the working memory again with more incoming data. After all input
>>>> data has been received, sorted, and written as sorted runs to the local
>>>> file system, a fully sorted stream can be obtained. This is done by reading
>>>> the partially sorted runs from the local filesystem and sort-merging the
>>>> records on the fly. Once the sorted streams of both inputs are available,
>>>> both streams are sequentially read and merge-joined in a zig-zag fashion by
>>>> comparing the sorted join key attributes, building join element pairs for
>>>> matching keys, and advancing the sorted stream with the lower join key.*
>>>>
>>>> I am still investigating the possibility that Docker is at fault
>>>> regarding secondary storage limitations, but how would I go about
>>>> estimating the amount of disk space needed for this spilling on this
>>>> dataset?
>>>>
>>>> Thanks for your time,
>>>>
>>>> My best regards,
>>>>
>>>> Miguel E. Coimbra
>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>> Skype: miguel.e.coimbra
>>>
>>>
>>
>
>

Reply via email to