It looks like the result you are trying to fetch with collect() is too
large.
collect() does only work for result up to 10MB.

I would write the result to a file and read that file in.

Best, Fabian

2016-12-09 16:30 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:

> Hello Fabian,
>
> So if I want to have 10 nodes with one working thread each, I would just
> set this, I assume:
>
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 10
>
> There is progress, albeit little.
> I am now running on a directory with more space.
> For 10 iterations of label propagation, I am getting this error at the end
> (on the TaskManager).
> I thought the execution was taking too much time, so I checked CPU usage
> of the TaskManager and it was really low.
> Checking the log on the TaskManager, I found this error at the bottom in
> bold:
>
>
> 2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.
> taskmanager.Task                     - Freeing task resources for
> IterationHead(Scatter-gather iteration (org.apache.flink.graph.
> library.LabelPropagation$UpdateVertexLabel@21aa6d6c |
> org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
> (1/1)
> 2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.
> taskmanager.Task                     - DataSink (collect()) (1/1)
> switched to FINISHED
> 2016-12-09 09:46:00,305 INFO  org.apache.flink.runtime.
> taskmanager.Task                     - Freeing task resources for
> DataSink (collect()) (1/1)
> 2016-12-09 09:46:00,317 INFO  org.apache.flink.runtime.
> taskmanager.TaskManager              - Un-registering task and sending
> final execution state FINISHED to JobManager for task
> IterationHead(Scatter-gather iteration (org.apache.flink.graph.
> library.LabelPropagation$UpdateVertexLabel@21aa6d6c |
> org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76))
> (89eb2508cbda679502c2e0b258068274)
> 2016-12-09 09:46:00,317 INFO  org.apache.flink.runtime.
> taskmanager.TaskManager              - Un-registering task and sending
> final execution state FINISHED to JobManager for task DataSink (collect()) (
> 26b8f3950f4e736b0798d28c4bf967ed)
> 2016-12-09 09:46:04,080 ERROR akka.remote.EndpointWriter
>                                 - Transient association error
> (association remains live)
> *akka.remote.OversizedPayloadException: Discarding oversized payload sent
> to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963
> <http://flink@172.18.0.2:6123/user/jobmanager#1638751963>]: max allowed
> size 10485760 bytes, actual size of encoded class
> org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
> was 79885809 bytes.*
>
> Do you have any idea what this might be?
>
> Kind regards,
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
> Skype: miguel.e.coimbra
>
> On 6 December 2016 at 19:57, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Miguel,
>>
>> estimating the space requirements is not trivial. It depends of course on
>> the algorithm and the data itself. I'm not an expert for graph algorithms
>> and don't know your datasets.
>>
>> But have you tried to run the algorithm in a non dockerized environment?
>> That might help to figure out if this is an issue with the Docker
>> configuration rather than Flink.
>>
>> Btw. If you want to run with a parallelism of 3 you need at least three
>> slots, either 3 three slots in one TM or 1 slot in each of three TMs.
>>
>> Best,
>> Fabian
>>
>> 2016-12-05 17:20 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>
>>> Hello Fabian,
>>>
>>> Thanks for the attention. Still haven't solved this.
>>> I did set up a cron job to clean the Docker images daily - thanks for
>>> that hint.
>>> As a last resort, I am going to look into a 2 TB NAS to see if this
>>> works.
>>>
>>> What is confusing me is that this happens also for the
>>> com-orkut.ungraph.txt dataset which is much smaller than
>>> com-friendster.ungraph.txt but not that much bigger than the
>>> com-dblp.ungraph.txt.
>>>
>>> DBLP - ​I am able to run the DBLP on one TaskManager container.​
>>> https://snap.stanford.edu/data/com-DBLP.html
>>> Nodes 317080  ~0.3 M
>>> Edges 1049866 ~ 1 M
>>>
>>> Orkut - no disk space error.
>>> https://snap.stanford.edu/data/com-Orkut.html
>>> Nodes 3072441 ~3 M
>>> Edges 117185083 ~ 117 M
>>>
>>> ​Friendster - no disk space error.
>>> https://snap.stanford.edu/data/com-Friendster.html
>>> Nodes 65608366 ~65 M
>>> Edges 1806067135 ~ 1800 M​
>>>
>>> For testing purposes, I'm using a JobManager (in its own Docker
>>> container), a single TaskManager (in its own Docker container) with the
>>> following config parameters:
>>>
>>> Heap is currently configured to 6 GB:
>>> taskmanager.heap.mb: 6000
>>>
>>> Parallelism is set as such:
>>>
>>> taskmanager.numberOfTaskSlots: 1
>>> parallelism.default: 1
>>>
>>> It is my understanding that if I want to test for example N = 3
>>> TaskManagers (each in its own Docker container) with minimum parallelism
>>> within each, I would use:
>>>
>>> taskmanager.numberOfTaskSlots: 1
>>> parallelism.default: 3
>>>
>>>
>>> Fabian, do you think you could help estimate how much disk space would
>>> be required to compute the Orkut data set for example?
>>> I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
>>> This is the code I am using to read SNAP datasets and to test with
>>> Orkut, Friendster and DBLP, in case you have a minute to inspect it and see
>>> if something is amiss:
>>>
>>> public class App {
>>>     public static void main(String[] args) {
>>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecut
>>> ionEnvironment();
>>>         final String dataPath = args[0];
>>>
>>>         final DataSet<Tuple2<Long, Long>> edgeTuples =
>>> env.readCsvFile(dataPath)
>>>             .fieldDelimiter("\t") // node IDs are separated by spaces
>>>             .ignoreComments("#")  // comments start with "#"
>>>             .types(Long.class, Long.class);
>>>
>>>         // Dealing with an undirected graph, so we call .getUndirected()
>>> at the end.
>>>         final Graph<Long, Long, NullValue> graph =
>>> Graph.fromTuple2DataSet(
>>>             edgeTuples,
>>>             new MapFunction<Long, Long>() {
>>>                 private static final long serialVersionUID =
>>> 8713516577419451509L;
>>>                 private long test = 1L;
>>>                 public Long map(Long value) {
>>>                     return value;
>>>                 }
>>>             },
>>>             env
>>>         ).getUndirected();
>>>
>>>
>>>         try {
>>>             // Generate a unique ID value for each vertex.
>>>             // Based on https://github.com/apache/flin
>>> k/blob/master/flink-libraries/flink-gelly-examples/src/main/
>>> java/org/apache/flink/graph/examples/MusicProfiles.java
>>>             DataSet<Tuple2<Long, Long>> idsWithInitialLabels =
>>> DataSetUtils.zipWithUniqueId(graph.getVertexIds())
>>>                 .map(
>>>                     new MapFunction<Tuple2<Long, Long>, Tuple2<Long,
>>> Long>>() {
>>>                         private static final long serialVersionUID =
>>> -6348050104902440929L;
>>>
>>>                         @Override
>>>                         public Tuple2<Long, Long> map(Tuple2<Long, Long>
>>> tuple2) throws Exception {
>>>                             return new Tuple2<Long, Long>(tuple2.f1,
>>> tuple2.f0);
>>>                         }
>>>                     }
>>>                 );
>>>
>>>             // Build the graph with initialization values.
>>>             final Graph<Long, Long, NullValue> graphWithIDs = graph
>>>                 .joinWithVertices(idsWithInitialLabels,
>>>                     new VertexJoinFunction<Long, Long>() {
>>>                         private static final long serialVersionUID =
>>> -315275119763760820L;
>>>                         public Long vertexJoin(Long vertexValue, Long
>>> inputValue) {
>>>                             return inputValue;
>>>                         }
>>>                 });
>>>
>>>             // Execute LabelPropagation over it.
>>>             DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new
>>> LabelPropagation<Long, Long, NullValue>(10));
>>>
>>>             graph.getVertices().print();
>>>
>>>             TimeUnit.SECONDS.sleep(2);
>>>
>>>             System.out.println("graphWithIDs");
>>>             graphWithIDs.getVertices().print();
>>>             graphWithIDs.getEdges().print();
>>>
>>>             TimeUnit.SECONDS.sleep(2);
>>>
>>>             // Group vertices by similar communities.
>>>             final List<Vertex<Long, Long>> collected = result.collect();
>>>             final HashMap<Long, ArrayList<Long>> commSizes = new
>>> HashMap<Long, ArrayList<Long>>();
>>>             for(Vertex<Long, Long> v : collected) {
>>>                 //System.out.println("collected[v] = id:" + v.getId() +
>>> "\tval:" + v.getValue());
>>>                 if(!commSizes.containsKey(v.getValue())) {
>>>                     commSizes.put(v.getValue(), new ArrayList<Long>());
>>>                 }
>>>                 commSizes.get(v.getValue()).add(v.getId());
>>>             }
>>>
>>>
>>>             System.out.println("#communities:\t" +
>>> commSizes.keySet().size() + "\n|result|:\t" + result.count() +
>>> "\n|collected|:\t" + collected.size());
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> }
>>>
>>> ​Thanks for your time,​
>>>
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>> Skype: miguel.e.coimbra
>>>
>>> ---------- Forwarded message ----------
>>>
>>>> From: Fabian Hueske <
>>>> ​​
>>>> fhue...@gmail.com>
>>>> To: user@flink.apache.org
>>>> Cc:
>>>> Date: Mon, 5 Dec 2016 08:40:04 +0100
>>>> Subject:
>>>> ​​
>>>> Re: Thread 'SortMerger spilling thread' terminated due to an exception:
>>>> No space left on device
>>>>
>>>> Hi Miguel,
>>>>
>>>> have you found a solution to your problem?
>>>> I'm not a docker expert but this forum thread looks like could be
>>>> related to your problem [1].
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>> [1] https://forums.docker.com/t/no-space-left-on-device-error/10894
>>>>
>>>> 2016-12-02 17:43 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>>>
>>>>> Hello Fabian,
>>>>>
>>>>> I have created a directory on my host machine user directory (
>>>>> /home/myuser/mydir ) and I am mapping it as a volume with Docker for the
>>>>> TaskManager and JobManager containers.
>>>>> Each container will thus have the following directory /home/flink/htmp
>>>>>
>>>>> host ---> container
>>>>> /home/myuser/mydir ---> /home/flink/htmp
>>>>>
>>>>> I had previously done this successfully with the a host directory
>>>>> which holds several SNAP data sets.
>>>>> In the Flink configuration file, I specified /home/flink/htmp to be
>>>>> used as the tmp dir for the TaskManager.
>>>>> This seems to be working, as I was able to start the cluster and
>>>>> invoke Flink for that Friendster dataset.
>>>>>
>>>>> However, during execution, there were 2 intermediate files which kept
>>>>> growing until they reached about 30 GB.
>>>>> At that point, the Flink TaskManager threw the exception again:
>>>>>
>>>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>>> 'SortMerger spilling thread' terminated due to an exception: No space left
>>>>> on device
>>>>>
>>>>> Here is an ls excerpt of the directory on the host (to which the
>>>>> TaskManager container was also writing successfully) shortly before the
>>>>> exception:
>>>>>
>>>>> *31G *9d177a1971322263f1597c3378885ccf.channel
>>>>> *31G* a693811249bc5f785a79d1b1b537fe93.channel
>>>>>
>>>>> Now I believe the host system is capable of storing hundred GBs more,
>>>>> so I am confused as to what the problem might be.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Miguel E. Coimbra
>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>> Skype: miguel.e.coimbra
>>>>>
>>>>> ​
>>>>>>
>>>>>> Hi Miguel,
>>>>>>
>>>>>> the exception does indeed indicate that the process ran out of
>>>>>> available disk space.
>>>>>> The quoted paragraph of the blog post describes the situation when
>>>>>> you receive the IOE.
>>>>>>
>>>>>> By default the systems default tmp dir is used. I don't know which
>>>>>> folder that would be in a Docker setup.
>>>>>> You can configure the temp dir using the taskmanager.tmp.dirs config
>>>>>> key.
>>>>>> Please see the configuration documentation for details [1].
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>> setup/config.html#jobmanager-amp-taskmanager
>>>>>>
>>>>>> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>
>>>>>> :
>>>>>> ​
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have a problem for which I hope someone will be able to give a
>>>>>>> hint.
>>>>>>> I am running the Flink *standalone* cluster with 2 Docker
>>>>>>> containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 
>>>>>>> GB
>>>>>>> of RAM.
>>>>>>>
>>>>>>> The dataset is a large one: SNAP Friendster, which has around 1800 M
>>>>>>> edges.
>>>>>>> https://snap.stanford.edu/data/com-Friendster.html
>>>>>>>
>>>>>>> I am trying to run the Gelly built-in label propagation algorithm on
>>>>>>> top of it.
>>>>>>> As this is a very big dataset, I believe I am exceeding the
>>>>>>> available RAM and that the system is using secondary storage, which then
>>>>>>> fails:
>>>>>>>
>>>>>>>
>>>>>>> Connected to JobManager at Actor[akka.tcp://flink@172.19.
>>>>>>> 0.2:6123/user/jobmanager#894624508]
>>>>>>> 12/01/2016 17:58:24    Job execution switched to status RUNNING.
>>>>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>>>>> SCHEDULED
>>>>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>>>>> DEPLOYING
>>>>>>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>>>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>>>>>>> RUNNING
>>>>>>> 12/01/2016 17:58:24    Map (Map at 
>>>>>>> fromTuple2DataSet(Graph.java:343))(1/1)
>>>>>>> switched to SCHEDULED
>>>>>>> 12/01/2016 17:58:24    Map (Map at 
>>>>>>> fromTuple2DataSet(Graph.java:343))(1/1)
>>>>>>> switched to DEPLOYING
>>>>>>> 12/01/2016 17:58:24    Map (Map at 
>>>>>>> fromTuple2DataSet(Graph.java:343))(1/1)
>>>>>>> switched to RUNNING
>>>>>>> 12/01/2016 17:59:51    Map (Map at 
>>>>>>> fromTuple2DataSet(Graph.java:343))(1/1)
>>>>>>> switched to FAILED
>>>>>>> *java.lang.RuntimeException: Error obtaining the sorted input:
>>>>>>> Thread 'SortMerger spilling thread' terminated due to an exception: No
>>>>>>> space left on device*
>>>>>>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>     at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>> ask.java:1098)
>>>>>>>     at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.j
>>>>>>> ava:86)
>>>>>>>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>> ava:486)
>>>>>>>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>>> k.java:351)
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>> *Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>>>>> terminated due to an exception: No space left on device*
>>>>>>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>> Caused by: java.io.IOException: No space left on device
>>>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:
>>>>>>> 60)
>>>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
>>>>>>>     at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteReque
>>>>>>> st.write(AsynchronousFileIOChannel.java:344)
>>>>>>>     at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$Wr
>>>>>>> iterThread.run(IOManagerAsync.java:502)
>>>>>>>
>>>>>>>
>>>>>>> I do not have secondary storage limitations on the host system, so I
>>>>>>> believe the system would be able to handle whatever is spilled to the
>>>>>>> disk...
>>>>>>> Perhaps this is a Docker limitation regarding the usage of the
>>>>>>> host's secondary storage?
>>>>>>>
>>>>>>> Or is there perhaps some configuration or setting for the
>>>>>>> TaskManager which I am missing?
>>>>>>> Running the label propagation of Gelly on this dataset and cluster
>>>>>>> configuration, what would be the expected behavior if the system 
>>>>>>> consumes
>>>>>>> all the memory?
>>>>>>>
>>>>>>>
>>>>>>> I believe the SortMerger thread is associated to the following
>>>>>>> mechanism described in this blog post:
>>>>>>>
>>>>>>> https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>>>>>> -Flinks-Engine-Room.html
>>>>>>> *The Sort-Merge-Join works by first sorting both input data sets on
>>>>>>> their join key attributes (Sort Phase) and merging the sorted data sets 
>>>>>>> as
>>>>>>> a second step (Merge Phase). The sort is done in-memory if the local
>>>>>>> partition of a data set is small enough. Otherwise, an external 
>>>>>>> merge-sort
>>>>>>> is done by collecting data until the working memory is filled, sorting 
>>>>>>> it,
>>>>>>> writing the sorted data to the local filesystem, and starting over by
>>>>>>> filling the working memory again with more incoming data. After all 
>>>>>>> input
>>>>>>> data has been received, sorted, and written as sorted runs to the local
>>>>>>> file system, a fully sorted stream can be obtained. This is done by 
>>>>>>> reading
>>>>>>> the partially sorted runs from the local filesystem and sort-merging the
>>>>>>> records on the fly. Once the sorted streams of both inputs are 
>>>>>>> available,
>>>>>>> both streams are sequentially read and merge-joined in a zig-zag 
>>>>>>> fashion by
>>>>>>> comparing the sorted join key attributes, building join element pairs 
>>>>>>> for
>>>>>>> matching keys, and advancing the sorted stream with the lower join key.*
>>>>>>>
>>>>>>> I am still investigating the possibility that Docker is at fault
>>>>>>> regarding secondary storage limitations, but how would I go about
>>>>>>> estimating the amount of disk space needed for this spilling on this
>>>>>>> dataset?
>>>>>>>
>>>>>>> Thanks for your time,
>>>>>>>
>>>>>>> My best regards,
>>>>>>>
>>>>>>> Miguel E. Coimbra
>>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>>>> Skype: miguel.e.coimbra
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to