Hello Fabian,

So if I want to have 10 nodes with one working thread each, I would just
set this, I assume:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 10

There is progress, albeit little.
I am now running on a directory with more space.
For 10 iterations of label propagation, I am getting this error at the end
(on the TaskManager).
I thought the execution was taking too much time, so I checked CPU usage of
the TaskManager and it was really low.
Checking the log on the TaskManager, I found this error at the bottom in
bold:


2016-12-09 09:46:00,305 INFO
org.apache.flink.runtime.taskmanager.Task                     - Freeing
task resources for IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c
|
org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
(1/1)
2016-12-09 09:46:00,305 INFO
org.apache.flink.runtime.taskmanager.Task                     - DataSink
(collect()) (1/1) switched to FINISHED
2016-12-09 09:46:00,305 INFO
org.apache.flink.runtime.taskmanager.Task                     - Freeing
task resources for DataSink (collect()) (1/1)
2016-12-09 09:46:00,317 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Un-registering task and sending final execution state FINISHED to
JobManager for task IterationHead(Scatter-gather iteration
(org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c
|
org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
(89eb2508cbda679502c2e0b258068274)
2016-12-09 09:46:00,317 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Un-registering task and sending final execution state FINISHED to
JobManager for task DataSink (collect()) (26b8f3950f4e736b0798d28c4bf967ed)
2016-12-09 09:46:04,080 ERROR
akka.remote.EndpointWriter                                    - Transient
association error (association remains live)
*akka.remote.OversizedPayloadException: Discarding oversized payload sent
to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963
<http://flink@172.18.0.2:6123/user/jobmanager#1638751963>]: max allowed
size 10485760 bytes, actual size of encoded class
org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
was 79885809 bytes.*

Do you have any idea what this might be?

Kind regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

On 6 December 2016 at 19:57, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Miguel,
>
> estimating the space requirements is not trivial. It depends of course on
> the algorithm and the data itself. I'm not an expert for graph algorithms
> and don't know your datasets.
>
> But have you tried to run the algorithm in a non dockerized environment?
> That might help to figure out if this is an issue with the Docker
> configuration rather than Flink.
>
> Btw. If you want to run with a parallelism of 3 you need at least three
> slots, either 3 three slots in one TM or 1 slot in each of three TMs.
>
> Best,
> Fabian
>
> 2016-12-05 17:20 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Hello Fabian,
>>
>> Thanks for the attention. Still haven't solved this.
>> I did set up a cron job to clean the Docker images daily - thanks for
>> that hint.
>> As a last resort, I am going to look into a 2 TB NAS to see if this works.
>>
>> What is confusing me is that this happens also for the
>> com-orkut.ungraph.txt dataset which is much smaller than
>> com-friendster.ungraph.txt but not that much bigger than the
>> com-dblp.ungraph.txt.
>>
>> DBLP - ​I am able to run the DBLP on one TaskManager container.​
>> https://snap.stanford.edu/data/com-DBLP.html
>> Nodes 317080  ~0.3 M
>> Edges 1049866 ~ 1 M
>>
>> Orkut - no disk space error.
>> https://snap.stanford.edu/data/com-Orkut.html
>> Nodes 3072441 ~3 M
>> Edges 117185083 ~ 117 M
>>
>> ​Friendster - no disk space error.
>> https://snap.stanford.edu/data/com-Friendster.html
>> Nodes 65608366 ~65 M
>> Edges 1806067135 ~ 1800 M​
>>
>> For testing purposes, I'm using a JobManager (in its own Docker
>> container), a single TaskManager (in its own Docker container) with the
>> following config parameters:
>>
>> Heap is currently configured to 6 GB:
>> taskmanager.heap.mb: 6000
>>
>> Parallelism is set as such:
>>
>> taskmanager.numberOfTaskSlots: 1
>> parallelism.default: 1
>>
>> It is my understanding that if I want to test for example N = 3
>> TaskManagers (each in its own Docker container) with minimum parallelism
>> within each, I would use:
>>
>> taskmanager.numberOfTaskSlots: 1
>> parallelism.default: 3
>>
>>
>> Fabian, do you think you could help estimate how much disk space would be
>> required to compute the Orkut data set for example?
>> I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
>> This is the code I am using to read SNAP datasets and to test with Orkut,
>> Friendster and DBLP, in case you have a minute to inspect it and see if
>> something is amiss:
>>
>> public class App {
>>     public static void main(String[] args) {
>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecut
>> ionEnvironment();
>>         final String dataPath = args[0];
>>
>>         final DataSet<Tuple2<Long, Long>> edgeTuples =
>> env.readCsvFile(dataPath)
>>             .fieldDelimiter("\t") // node IDs are separated by spaces
>>             .ignoreComments("#")  // comments start with "#"
>>             .types(Long.class, Long.class);
>>
>>         // Dealing with an undirected graph, so we call .getUndirected()
>> at the end.
>>         final Graph<Long, Long, NullValue> graph =
>> Graph.fromTuple2DataSet(
>>             edgeTuples,
>>             new MapFunction<Long, Long>() {
>>                 private static final long serialVersionUID =
>> 8713516577419451509L;
>>                 private long test = 1L;
>>                 public Long map(Long value) {
>>                     return value;
>>                 }
>>             },
>>             env
>>         ).getUndirected();
>>
>>
>>         try {
>>             // Generate a unique ID value for each vertex.
>>             // Based on https://github.com/apache/flin
>> k/blob/master/flink-libraries/flink-gelly-examples/src/main/
>> java/org/apache/flink/graph/examples/MusicProfiles.java
>>             DataSet<Tuple2<Long, Long>> idsWithInitialLabels =
>> DataSetUtils.zipWithUniqueId(graph.getVertexIds())
>>                 .map(
>>                     new MapFunction<Tuple2<Long, Long>, Tuple2<Long,
>> Long>>() {
>>                         private static final long serialVersionUID =
>> -6348050104902440929L;
>>
>>                         @Override
>>                         public Tuple2<Long, Long> map(Tuple2<Long, Long>
>> tuple2) throws Exception {
>>                             return new Tuple2<Long, Long>(tuple2.f1,
>> tuple2.f0);
>>                         }
>>                     }
>>                 );
>>
>>             // Build the graph with initialization values.
>>             final Graph<Long, Long, NullValue> graphWithIDs = graph
>>                 .joinWithVertices(idsWithInitialLabels,
>>                     new VertexJoinFunction<Long, Long>() {
>>                         private static final long serialVersionUID =
>> -315275119763760820L;
>>                         public Long vertexJoin(Long vertexValue, Long
>> inputValue) {
>>                             return inputValue;
>>                         }
>>                 });
>>
>>             // Execute LabelPropagation over it.
>>             DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new
>> LabelPropagation<Long, Long, NullValue>(10));
>>
>>             graph.getVertices().print();
>>
>>             TimeUnit.SECONDS.sleep(2);
>>
>>             System.out.println("graphWithIDs");
>>             graphWithIDs.getVertices().print();
>>             graphWithIDs.getEdges().print();
>>
>>             TimeUnit.SECONDS.sleep(2);
>>
>>             // Group vertices by similar communities.
>>             final List<Vertex<Long, Long>> collected = result.collect();
>>             final HashMap<Long, ArrayList<Long>> commSizes = new
>> HashMap<Long, ArrayList<Long>>();
>>             for(Vertex<Long, Long> v : collected) {
>>                 //System.out.println("collected[v] = id:" + v.getId() +
>> "\tval:" + v.getValue());
>>                 if(!commSizes.containsKey(v.getValue())) {
>>                     commSizes.put(v.getValue(), new ArrayList<Long>());
>>                 }
>>                 commSizes.get(v.getValue()).add(v.getId());
>>             }
>>
>>
>>             System.out.println("#communities:\t" +
>> commSizes.keySet().size() + "\n|result|:\t" + result.count() +
>> "\n|collected|:\t" + collected.size());
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>     }
>> }
>>
>> ​Thanks for your time,​
>>
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>> Skype: miguel.e.coimbra
>>
>> ---------- Forwarded message ----------
>>
>>> From: Fabian Hueske <
>>> ​​
>>> fhue...@gmail.com>
>>> To: user@flink.apache.org
>>> Cc:
>>> Date: Mon, 5 Dec 2016 08:40:04 +0100
>>> Subject:
>>> ​​
>>> Re: Thread 'SortMerger spilling thread' terminated due to an exception:
>>> No space left on device
>>>
>>> Hi Miguel,
>>>
>>> have you found a solution to your problem?
>>> I'm not a docker expert but this forum thread looks like could be
>>> related to your problem [1].
>>>
>>> Best,
>>> Fabian
>>>
>>> [1] https://forums.docker.com/t/no-space-left-on-device-error/10894
>>>
>>> 2016-12-02 17:43 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>>
>>>> Hello Fabian,
>>>>
>>>> I have created a directory on my host machine user directory (
>>>> /home/myuser/mydir ) and I am mapping it as a volume with Docker for the
>>>> TaskManager and JobManager containers.
>>>> Each container will thus have the following directory /home/flink/htmp
>>>>
>>>> host ---> container
>>>> /home/myuser/mydir ---> /home/flink/htmp
>>>>
>>>> I had previously done this successfully with the a host directory which
>>>> holds several SNAP data sets.
>>>> In the Flink configuration file, I specified /home/flink/htmp to be
>>>> used as the tmp dir for the TaskManager.
>>>> This seems to be working, as I was able to start the cluster and invoke
>>>> Flink for that Friendster dataset.
>>>>
>>>> However, during execution, there were 2 intermediate files which kept
>>>> growing until they reached about 30 GB.
>>>> At that point, the Flink TaskManager threw the exception again:
>>>>
>>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>> 'SortMerger spilling thread' terminated due to an exception: No space left
>>>> on device
>>>>
>>>> Here is an ls excerpt of the directory on the host (to which the
>>>> TaskManager container was also writing successfully) shortly before the
>>>> exception:
>>>>
>>>> *31G *9d177a1971322263f1597c3378885ccf.channel
>>>> *31G* a693811249bc5f785a79d1b1b537fe93.channel
>>>>
>>>> Now I believe the host system is capable of storing hundred GBs more,
>>>> so I am confused as to what the problem might be.
>>>>
>>>> Best regards,
>>>>
>>>> Miguel E. Coimbra
>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>> Skype: miguel.e.coimbra
>>>>
>>>> ​
>>>>>
>>>>> Hi Miguel,
>>>>>
>>>>> the exception does indeed indicate that the process ran out of
>>>>> available disk space.
>>>>> The quoted paragraph of the blog post describes the situation when you
>>>>> receive the IOE.
>>>>>
>>>>> By default the systems default tmp dir is used. I don't know which
>>>>> folder that would be in a Docker setup.
>>>>> You can configure the temp dir using the taskmanager.tmp.dirs config
>>>>> key.
>>>>> Please see the configuration documentation for details [1].
>>>>>
>>>>> Hope this helps,
>>>>> Fabian
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>> setup/config.html#jobmanager-amp-taskmanager
>>>>>
>>>>> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>>>> ​
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a problem for which I hope someone will be able to give a hint.
>>>>>> I am running the Flink *standalone* cluster with 2 Docker containers
>>>>>> (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.
>>>>>>
>>>>>> The dataset is a large one: SNAP Friendster, which has around 1800 M
>>>>>> edges.
>>>>>> https://snap.stanford.edu/data/com-Friendster.html
>>>>>>
>>>>>> I am trying to run the Gelly built-in label propagation algorithm on
>>>>>> top of it.
>>>>>> As this is a very big dataset, I believe I am exceeding the available
>>>>>> RAM and that the system is using secondary storage, which then fails:
>>>>>>
>>>>>>
>>>>>> Connected to JobManager at Actor[akka.tcp://flink@172.19.
>>>>>> 0.2:6123/user/jobmanager#894624508]
>>>>>> 12/01/2016 17:58:24    Job execution switched to status RUNNING.
>>>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>>>> SCHEDULED
>>>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>>>> DEPLOYING
>>>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>>>> RUNNING
>>>>>> 12/01/2016 17:58:24    Map (Map at 
>>>>>> fromTuple2DataSet(Graph.java:343))(1/1)
>>>>>> switched to SCHEDULED
>>>>>> 12/01/2016 17:58:24    Map (Map at 
>>>>>> fromTuple2DataSet(Graph.java:343))(1/1)
>>>>>> switched to DEPLOYING
>>>>>> 12/01/2016 17:58:24    Map (Map at 
>>>>>> fromTuple2DataSet(Graph.java:343))(1/1)
>>>>>> switched to RUNNING
>>>>>> 12/01/2016 17:59:51    Map (Map at 
>>>>>> fromTuple2DataSet(Graph.java:343))(1/1)
>>>>>> switched to FAILED
>>>>>> *java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>>>> 'SortMerger spilling thread' terminated due to an exception: No space 
>>>>>> left
>>>>>> on device*
>>>>>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>     at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>> ask.java:1098)
>>>>>>     at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.j
>>>>>> ava:86)
>>>>>>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>> ava:486)
>>>>>>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>> k.java:351)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>> *Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>>>> terminated due to an exception: No space left on device*
>>>>>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>> Caused by: java.io.IOException: No space left on device
>>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:
>>>>>> 60)
>>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
>>>>>>     at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteReque
>>>>>> st.write(AsynchronousFileIOChannel.java:344)
>>>>>>     at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$Wr
>>>>>> iterThread.run(IOManagerAsync.java:502)
>>>>>>
>>>>>>
>>>>>> I do not have secondary storage limitations on the host system, so I
>>>>>> believe the system would be able to handle whatever is spilled to the
>>>>>> disk...
>>>>>> Perhaps this is a Docker limitation regarding the usage of the host's
>>>>>> secondary storage?
>>>>>>
>>>>>> Or is there perhaps some configuration or setting for the TaskManager
>>>>>> which I am missing?
>>>>>> Running the label propagation of Gelly on this dataset and cluster
>>>>>> configuration, what would be the expected behavior if the system consumes
>>>>>> all the memory?
>>>>>>
>>>>>>
>>>>>> I believe the SortMerger thread is associated to the following
>>>>>> mechanism described in this blog post:
>>>>>>
>>>>>> https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>>>>> -Flinks-Engine-Room.html
>>>>>> *The Sort-Merge-Join works by first sorting both input data sets on
>>>>>> their join key attributes (Sort Phase) and merging the sorted data sets 
>>>>>> as
>>>>>> a second step (Merge Phase). The sort is done in-memory if the local
>>>>>> partition of a data set is small enough. Otherwise, an external 
>>>>>> merge-sort
>>>>>> is done by collecting data until the working memory is filled, sorting 
>>>>>> it,
>>>>>> writing the sorted data to the local filesystem, and starting over by
>>>>>> filling the working memory again with more incoming data. After all input
>>>>>> data has been received, sorted, and written as sorted runs to the local
>>>>>> file system, a fully sorted stream can be obtained. This is done by 
>>>>>> reading
>>>>>> the partially sorted runs from the local filesystem and sort-merging the
>>>>>> records on the fly. Once the sorted streams of both inputs are available,
>>>>>> both streams are sequentially read and merge-joined in a zig-zag fashion 
>>>>>> by
>>>>>> comparing the sorted join key attributes, building join element pairs for
>>>>>> matching keys, and advancing the sorted stream with the lower join key.*
>>>>>>
>>>>>> I am still investigating the possibility that Docker is at fault
>>>>>> regarding secondary storage limitations, but how would I go about
>>>>>> estimating the amount of disk space needed for this spilling on this
>>>>>> dataset?
>>>>>>
>>>>>> Thanks for your time,
>>>>>>
>>>>>> My best regards,
>>>>>>
>>>>>> Miguel E. Coimbra
>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>>> Skype: miguel.e.coimbra
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Reply via email to