It looks like the result you are trying to fetch with collect() is too large. collect() does only work for result up to 10MB.
I would write the result to a file and read that file in. Best, Fabian 2016-12-09 16:30 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: > Hello Fabian, > > So if I want to have 10 nodes with one working thread each, I would just > set this, I assume: > > taskmanager.numberOfTaskSlots: 1 > parallelism.default: 10 > > There is progress, albeit little. > I am now running on a directory with more space. > For 10 iterations of label propagation, I am getting this error at the end > (on the TaskManager). > I thought the execution was taking too much time, so I checked CPU usage > of the TaskManager and it was really low. > Checking the log on the TaskManager, I found this error at the bottom in > bold: > > > 2016-12-09 09:46:00,305 INFO org.apache.flink.runtime. > taskmanager.Task - Freeing task resources for > IterationHead(Scatter-gather iteration (org.apache.flink.graph. > library.LabelPropagation$UpdateVertexLabel@21aa6d6c | > org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) > (1/1) > 2016-12-09 09:46:00,305 INFO org.apache.flink.runtime. > taskmanager.Task - DataSink (collect()) (1/1) > switched to FINISHED > 2016-12-09 09:46:00,305 INFO org.apache.flink.runtime. > taskmanager.Task - Freeing task resources for > DataSink (collect()) (1/1) > 2016-12-09 09:46:00,317 INFO org.apache.flink.runtime. > taskmanager.TaskManager - Un-registering task and sending > final execution state FINISHED to JobManager for task > IterationHead(Scatter-gather iteration (org.apache.flink.graph. > library.LabelPropagation$UpdateVertexLabel@21aa6d6c | > org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) > (89eb2508cbda679502c2e0b258068274) > 2016-12-09 09:46:00,317 INFO org.apache.flink.runtime. > taskmanager.TaskManager - Un-registering task and sending > final execution state FINISHED to JobManager for task DataSink (collect()) ( > 26b8f3950f4e736b0798d28c4bf967ed) > 2016-12-09 09:46:04,080 ERROR akka.remote.EndpointWriter > - Transient association error > (association remains live) > *akka.remote.OversizedPayloadException: Discarding oversized payload sent > to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963 > <http://flink@172.18.0.2:6123/user/jobmanager#1638751963>]: max allowed > size 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage > was 79885809 bytes.* > > Do you have any idea what this might be? > > Kind regards, > > Miguel E. Coimbra > Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> > Skype: miguel.e.coimbra > > On 6 December 2016 at 19:57, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Miguel, >> >> estimating the space requirements is not trivial. It depends of course on >> the algorithm and the data itself. I'm not an expert for graph algorithms >> and don't know your datasets. >> >> But have you tried to run the algorithm in a non dockerized environment? >> That might help to figure out if this is an issue with the Docker >> configuration rather than Flink. >> >> Btw. If you want to run with a parallelism of 3 you need at least three >> slots, either 3 three slots in one TM or 1 slot in each of three TMs. >> >> Best, >> Fabian >> >> 2016-12-05 17:20 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >> >>> Hello Fabian, >>> >>> Thanks for the attention. Still haven't solved this. >>> I did set up a cron job to clean the Docker images daily - thanks for >>> that hint. >>> As a last resort, I am going to look into a 2 TB NAS to see if this >>> works. >>> >>> What is confusing me is that this happens also for the >>> com-orkut.ungraph.txt dataset which is much smaller than >>> com-friendster.ungraph.txt but not that much bigger than the >>> com-dblp.ungraph.txt. >>> >>> DBLP - I am able to run the DBLP on one TaskManager container. >>> https://snap.stanford.edu/data/com-DBLP.html >>> Nodes 317080 ~0.3 M >>> Edges 1049866 ~ 1 M >>> >>> Orkut - no disk space error. >>> https://snap.stanford.edu/data/com-Orkut.html >>> Nodes 3072441 ~3 M >>> Edges 117185083 ~ 117 M >>> >>> Friendster - no disk space error. >>> https://snap.stanford.edu/data/com-Friendster.html >>> Nodes 65608366 ~65 M >>> Edges 1806067135 ~ 1800 M >>> >>> For testing purposes, I'm using a JobManager (in its own Docker >>> container), a single TaskManager (in its own Docker container) with the >>> following config parameters: >>> >>> Heap is currently configured to 6 GB: >>> taskmanager.heap.mb: 6000 >>> >>> Parallelism is set as such: >>> >>> taskmanager.numberOfTaskSlots: 1 >>> parallelism.default: 1 >>> >>> It is my understanding that if I want to test for example N = 3 >>> TaskManagers (each in its own Docker container) with minimum parallelism >>> within each, I would use: >>> >>> taskmanager.numberOfTaskSlots: 1 >>> parallelism.default: 3 >>> >>> >>> Fabian, do you think you could help estimate how much disk space would >>> be required to compute the Orkut data set for example? >>> I am running a Flink 1.1.3 Docker cluster with a single TaskManager. >>> This is the code I am using to read SNAP datasets and to test with >>> Orkut, Friendster and DBLP, in case you have a minute to inspect it and see >>> if something is amiss: >>> >>> public class App { >>> public static void main(String[] args) { >>> final ExecutionEnvironment env = ExecutionEnvironment.getExecut >>> ionEnvironment(); >>> final String dataPath = args[0]; >>> >>> final DataSet<Tuple2<Long, Long>> edgeTuples = >>> env.readCsvFile(dataPath) >>> .fieldDelimiter("\t") // node IDs are separated by spaces >>> .ignoreComments("#") // comments start with "#" >>> .types(Long.class, Long.class); >>> >>> // Dealing with an undirected graph, so we call .getUndirected() >>> at the end. >>> final Graph<Long, Long, NullValue> graph = >>> Graph.fromTuple2DataSet( >>> edgeTuples, >>> new MapFunction<Long, Long>() { >>> private static final long serialVersionUID = >>> 8713516577419451509L; >>> private long test = 1L; >>> public Long map(Long value) { >>> return value; >>> } >>> }, >>> env >>> ).getUndirected(); >>> >>> >>> try { >>> // Generate a unique ID value for each vertex. >>> // Based on https://github.com/apache/flin >>> k/blob/master/flink-libraries/flink-gelly-examples/src/main/ >>> java/org/apache/flink/graph/examples/MusicProfiles.java >>> DataSet<Tuple2<Long, Long>> idsWithInitialLabels = >>> DataSetUtils.zipWithUniqueId(graph.getVertexIds()) >>> .map( >>> new MapFunction<Tuple2<Long, Long>, Tuple2<Long, >>> Long>>() { >>> private static final long serialVersionUID = >>> -6348050104902440929L; >>> >>> @Override >>> public Tuple2<Long, Long> map(Tuple2<Long, Long> >>> tuple2) throws Exception { >>> return new Tuple2<Long, Long>(tuple2.f1, >>> tuple2.f0); >>> } >>> } >>> ); >>> >>> // Build the graph with initialization values. >>> final Graph<Long, Long, NullValue> graphWithIDs = graph >>> .joinWithVertices(idsWithInitialLabels, >>> new VertexJoinFunction<Long, Long>() { >>> private static final long serialVersionUID = >>> -315275119763760820L; >>> public Long vertexJoin(Long vertexValue, Long >>> inputValue) { >>> return inputValue; >>> } >>> }); >>> >>> // Execute LabelPropagation over it. >>> DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new >>> LabelPropagation<Long, Long, NullValue>(10)); >>> >>> graph.getVertices().print(); >>> >>> TimeUnit.SECONDS.sleep(2); >>> >>> System.out.println("graphWithIDs"); >>> graphWithIDs.getVertices().print(); >>> graphWithIDs.getEdges().print(); >>> >>> TimeUnit.SECONDS.sleep(2); >>> >>> // Group vertices by similar communities. >>> final List<Vertex<Long, Long>> collected = result.collect(); >>> final HashMap<Long, ArrayList<Long>> commSizes = new >>> HashMap<Long, ArrayList<Long>>(); >>> for(Vertex<Long, Long> v : collected) { >>> //System.out.println("collected[v] = id:" + v.getId() + >>> "\tval:" + v.getValue()); >>> if(!commSizes.containsKey(v.getValue())) { >>> commSizes.put(v.getValue(), new ArrayList<Long>()); >>> } >>> commSizes.get(v.getValue()).add(v.getId()); >>> } >>> >>> >>> System.out.println("#communities:\t" + >>> commSizes.keySet().size() + "\n|result|:\t" + result.count() + >>> "\n|collected|:\t" + collected.size()); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> } >>> } >>> >>> Thanks for your time, >>> >>> >>> Miguel E. Coimbra >>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>> Skype: miguel.e.coimbra >>> >>> ---------- Forwarded message ---------- >>> >>>> From: Fabian Hueske < >>>> >>>> fhue...@gmail.com> >>>> To: user@flink.apache.org >>>> Cc: >>>> Date: Mon, 5 Dec 2016 08:40:04 +0100 >>>> Subject: >>>> >>>> Re: Thread 'SortMerger spilling thread' terminated due to an exception: >>>> No space left on device >>>> >>>> Hi Miguel, >>>> >>>> have you found a solution to your problem? >>>> I'm not a docker expert but this forum thread looks like could be >>>> related to your problem [1]. >>>> >>>> Best, >>>> Fabian >>>> >>>> [1] https://forums.docker.com/t/no-space-left-on-device-error/10894 >>>> >>>> 2016-12-02 17:43 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >>>> >>>>> Hello Fabian, >>>>> >>>>> I have created a directory on my host machine user directory ( >>>>> /home/myuser/mydir ) and I am mapping it as a volume with Docker for the >>>>> TaskManager and JobManager containers. >>>>> Each container will thus have the following directory /home/flink/htmp >>>>> >>>>> host ---> container >>>>> /home/myuser/mydir ---> /home/flink/htmp >>>>> >>>>> I had previously done this successfully with the a host directory >>>>> which holds several SNAP data sets. >>>>> In the Flink configuration file, I specified /home/flink/htmp to be >>>>> used as the tmp dir for the TaskManager. >>>>> This seems to be working, as I was able to start the cluster and >>>>> invoke Flink for that Friendster dataset. >>>>> >>>>> However, during execution, there were 2 intermediate files which kept >>>>> growing until they reached about 30 GB. >>>>> At that point, the Flink TaskManager threw the exception again: >>>>> >>>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread >>>>> 'SortMerger spilling thread' terminated due to an exception: No space left >>>>> on device >>>>> >>>>> Here is an ls excerpt of the directory on the host (to which the >>>>> TaskManager container was also writing successfully) shortly before the >>>>> exception: >>>>> >>>>> *31G *9d177a1971322263f1597c3378885ccf.channel >>>>> *31G* a693811249bc5f785a79d1b1b537fe93.channel >>>>> >>>>> Now I believe the host system is capable of storing hundred GBs more, >>>>> so I am confused as to what the problem might be. >>>>> >>>>> Best regards, >>>>> >>>>> Miguel E. Coimbra >>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>>> Skype: miguel.e.coimbra >>>>> >>>>> >>>>>> >>>>>> Hi Miguel, >>>>>> >>>>>> the exception does indeed indicate that the process ran out of >>>>>> available disk space. >>>>>> The quoted paragraph of the blog post describes the situation when >>>>>> you receive the IOE. >>>>>> >>>>>> By default the systems default tmp dir is used. I don't know which >>>>>> folder that would be in a Docker setup. >>>>>> You can configure the temp dir using the taskmanager.tmp.dirs config >>>>>> key. >>>>>> Please see the configuration documentation for details [1]. >>>>>> >>>>>> Hope this helps, >>>>>> Fabian >>>>>> >>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>>> setup/config.html#jobmanager-amp-taskmanager >>>>>> >>>>>> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com> >>>>>> : >>>>>> >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I have a problem for which I hope someone will be able to give a >>>>>>> hint. >>>>>>> I am running the Flink *standalone* cluster with 2 Docker >>>>>>> containers (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 >>>>>>> GB >>>>>>> of RAM. >>>>>>> >>>>>>> The dataset is a large one: SNAP Friendster, which has around 1800 M >>>>>>> edges. >>>>>>> https://snap.stanford.edu/data/com-Friendster.html >>>>>>> >>>>>>> I am trying to run the Gelly built-in label propagation algorithm on >>>>>>> top of it. >>>>>>> As this is a very big dataset, I believe I am exceeding the >>>>>>> available RAM and that the system is using secondary storage, which then >>>>>>> fails: >>>>>>> >>>>>>> >>>>>>> Connected to JobManager at Actor[akka.tcp://flink@172.19. >>>>>>> 0.2:6123/user/jobmanager#894624508] >>>>>>> 12/01/2016 17:58:24 Job execution switched to status RUNNING. >>>>>>> 12/01/2016 17:58:24 DataSource (at main(App.java:33) ( >>>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >>>>>>> SCHEDULED >>>>>>> 12/01/2016 17:58:24 DataSource (at main(App.java:33) ( >>>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >>>>>>> DEPLOYING >>>>>>> 12/01/2016 17:58:24 DataSource (at main(App.java:33) ( >>>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >>>>>>> RUNNING >>>>>>> 12/01/2016 17:58:24 Map (Map at >>>>>>> fromTuple2DataSet(Graph.java:343))(1/1) >>>>>>> switched to SCHEDULED >>>>>>> 12/01/2016 17:58:24 Map (Map at >>>>>>> fromTuple2DataSet(Graph.java:343))(1/1) >>>>>>> switched to DEPLOYING >>>>>>> 12/01/2016 17:58:24 Map (Map at >>>>>>> fromTuple2DataSet(Graph.java:343))(1/1) >>>>>>> switched to RUNNING >>>>>>> 12/01/2016 17:59:51 Map (Map at >>>>>>> fromTuple2DataSet(Graph.java:343))(1/1) >>>>>>> switched to FAILED >>>>>>> *java.lang.RuntimeException: Error obtaining the sorted input: >>>>>>> Thread 'SortMerger spilling thread' terminated due to an exception: No >>>>>>> space left on device* >>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>>> ask.java:1098) >>>>>>> at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.j >>>>>>> ava:86) >>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>>> ava:486) >>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>>>> k.java:351) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> *Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>>>>> terminated due to an exception: No space left on device* >>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>> $ThreadBase.run(UnilateralSortMerger.java:800) >>>>>>> Caused by: java.io.IOException: No space left on device >>>>>>> at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >>>>>>> at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java: >>>>>>> 60) >>>>>>> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >>>>>>> at sun.nio.ch.IOUtil.write(IOUtil.java:65) >>>>>>> at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) >>>>>>> at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteReque >>>>>>> st.write(AsynchronousFileIOChannel.java:344) >>>>>>> at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$Wr >>>>>>> iterThread.run(IOManagerAsync.java:502) >>>>>>> >>>>>>> >>>>>>> I do not have secondary storage limitations on the host system, so I >>>>>>> believe the system would be able to handle whatever is spilled to the >>>>>>> disk... >>>>>>> Perhaps this is a Docker limitation regarding the usage of the >>>>>>> host's secondary storage? >>>>>>> >>>>>>> Or is there perhaps some configuration or setting for the >>>>>>> TaskManager which I am missing? >>>>>>> Running the label propagation of Gelly on this dataset and cluster >>>>>>> configuration, what would be the expected behavior if the system >>>>>>> consumes >>>>>>> all the memory? >>>>>>> >>>>>>> >>>>>>> I believe the SortMerger thread is associated to the following >>>>>>> mechanism described in this blog post: >>>>>>> >>>>>>> https://flink.apache.org/news/2015/03/13/peeking-into-Apache >>>>>>> -Flinks-Engine-Room.html >>>>>>> *The Sort-Merge-Join works by first sorting both input data sets on >>>>>>> their join key attributes (Sort Phase) and merging the sorted data sets >>>>>>> as >>>>>>> a second step (Merge Phase). The sort is done in-memory if the local >>>>>>> partition of a data set is small enough. Otherwise, an external >>>>>>> merge-sort >>>>>>> is done by collecting data until the working memory is filled, sorting >>>>>>> it, >>>>>>> writing the sorted data to the local filesystem, and starting over by >>>>>>> filling the working memory again with more incoming data. After all >>>>>>> input >>>>>>> data has been received, sorted, and written as sorted runs to the local >>>>>>> file system, a fully sorted stream can be obtained. This is done by >>>>>>> reading >>>>>>> the partially sorted runs from the local filesystem and sort-merging the >>>>>>> records on the fly. Once the sorted streams of both inputs are >>>>>>> available, >>>>>>> both streams are sequentially read and merge-joined in a zig-zag >>>>>>> fashion by >>>>>>> comparing the sorted join key attributes, building join element pairs >>>>>>> for >>>>>>> matching keys, and advancing the sorted stream with the lower join key.* >>>>>>> >>>>>>> I am still investigating the possibility that Docker is at fault >>>>>>> regarding secondary storage limitations, but how would I go about >>>>>>> estimating the amount of disk space needed for this spilling on this >>>>>>> dataset? >>>>>>> >>>>>>> Thanks for your time, >>>>>>> >>>>>>> My best regards, >>>>>>> >>>>>>> Miguel E. Coimbra >>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>>>>> Skype: miguel.e.coimbra >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >