Hello Fabian, So if I want to have 10 nodes with one working thread each, I would just set this, I assume:
taskmanager.numberOfTaskSlots: 1 parallelism.default: 10 There is progress, albeit little. I am now running on a directory with more space. For 10 iterations of label propagation, I am getting this error at the end (on the TaskManager). I thought the execution was taking too much time, so I checked CPU usage of the TaskManager and it was really low. Checking the log on the TaskManager, I found this error at the bottom in bold: 2016-12-09 09:46:00,305 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c | org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) (1/1) 2016-12-09 09:46:00,305 INFO org.apache.flink.runtime.taskmanager.Task - DataSink (collect()) (1/1) switched to FINISHED 2016-12-09 09:46:00,305 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for DataSink (collect()) (1/1) 2016-12-09 09:46:00,317 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FINISHED to JobManager for task IterationHead(Scatter-gather iteration (org.apache.flink.graph.library.LabelPropagation$UpdateVertexLabel@21aa6d6c | org.apache.flink.graph.library.LabelPropagation$SendNewLabelToNeighbors@b968a76)) (89eb2508cbda679502c2e0b258068274) 2016-12-09 09:46:00,317 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FINISHED to JobManager for task DataSink (collect()) (26b8f3950f4e736b0798d28c4bf967ed) 2016-12-09 09:46:04,080 ERROR akka.remote.EndpointWriter - Transient association error (association remains live) *akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@172.18.0.2:6123/user/jobmanager#1638751963 <http://flink@172.18.0.2:6123/user/jobmanager#1638751963>]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 79885809 bytes.* Do you have any idea what this might be? Kind regards, Miguel E. Coimbra Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> Skype: miguel.e.coimbra On 6 December 2016 at 19:57, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Miguel, > > estimating the space requirements is not trivial. It depends of course on > the algorithm and the data itself. I'm not an expert for graph algorithms > and don't know your datasets. > > But have you tried to run the algorithm in a non dockerized environment? > That might help to figure out if this is an issue with the Docker > configuration rather than Flink. > > Btw. If you want to run with a parallelism of 3 you need at least three > slots, either 3 three slots in one TM or 1 slot in each of three TMs. > > Best, > Fabian > > 2016-12-05 17:20 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: > >> Hello Fabian, >> >> Thanks for the attention. Still haven't solved this. >> I did set up a cron job to clean the Docker images daily - thanks for >> that hint. >> As a last resort, I am going to look into a 2 TB NAS to see if this works. >> >> What is confusing me is that this happens also for the >> com-orkut.ungraph.txt dataset which is much smaller than >> com-friendster.ungraph.txt but not that much bigger than the >> com-dblp.ungraph.txt. >> >> DBLP - I am able to run the DBLP on one TaskManager container. >> https://snap.stanford.edu/data/com-DBLP.html >> Nodes 317080 ~0.3 M >> Edges 1049866 ~ 1 M >> >> Orkut - no disk space error. >> https://snap.stanford.edu/data/com-Orkut.html >> Nodes 3072441 ~3 M >> Edges 117185083 ~ 117 M >> >> Friendster - no disk space error. >> https://snap.stanford.edu/data/com-Friendster.html >> Nodes 65608366 ~65 M >> Edges 1806067135 ~ 1800 M >> >> For testing purposes, I'm using a JobManager (in its own Docker >> container), a single TaskManager (in its own Docker container) with the >> following config parameters: >> >> Heap is currently configured to 6 GB: >> taskmanager.heap.mb: 6000 >> >> Parallelism is set as such: >> >> taskmanager.numberOfTaskSlots: 1 >> parallelism.default: 1 >> >> It is my understanding that if I want to test for example N = 3 >> TaskManagers (each in its own Docker container) with minimum parallelism >> within each, I would use: >> >> taskmanager.numberOfTaskSlots: 1 >> parallelism.default: 3 >> >> >> Fabian, do you think you could help estimate how much disk space would be >> required to compute the Orkut data set for example? >> I am running a Flink 1.1.3 Docker cluster with a single TaskManager. >> This is the code I am using to read SNAP datasets and to test with Orkut, >> Friendster and DBLP, in case you have a minute to inspect it and see if >> something is amiss: >> >> public class App { >> public static void main(String[] args) { >> final ExecutionEnvironment env = ExecutionEnvironment.getExecut >> ionEnvironment(); >> final String dataPath = args[0]; >> >> final DataSet<Tuple2<Long, Long>> edgeTuples = >> env.readCsvFile(dataPath) >> .fieldDelimiter("\t") // node IDs are separated by spaces >> .ignoreComments("#") // comments start with "#" >> .types(Long.class, Long.class); >> >> // Dealing with an undirected graph, so we call .getUndirected() >> at the end. >> final Graph<Long, Long, NullValue> graph = >> Graph.fromTuple2DataSet( >> edgeTuples, >> new MapFunction<Long, Long>() { >> private static final long serialVersionUID = >> 8713516577419451509L; >> private long test = 1L; >> public Long map(Long value) { >> return value; >> } >> }, >> env >> ).getUndirected(); >> >> >> try { >> // Generate a unique ID value for each vertex. >> // Based on https://github.com/apache/flin >> k/blob/master/flink-libraries/flink-gelly-examples/src/main/ >> java/org/apache/flink/graph/examples/MusicProfiles.java >> DataSet<Tuple2<Long, Long>> idsWithInitialLabels = >> DataSetUtils.zipWithUniqueId(graph.getVertexIds()) >> .map( >> new MapFunction<Tuple2<Long, Long>, Tuple2<Long, >> Long>>() { >> private static final long serialVersionUID = >> -6348050104902440929L; >> >> @Override >> public Tuple2<Long, Long> map(Tuple2<Long, Long> >> tuple2) throws Exception { >> return new Tuple2<Long, Long>(tuple2.f1, >> tuple2.f0); >> } >> } >> ); >> >> // Build the graph with initialization values. >> final Graph<Long, Long, NullValue> graphWithIDs = graph >> .joinWithVertices(idsWithInitialLabels, >> new VertexJoinFunction<Long, Long>() { >> private static final long serialVersionUID = >> -315275119763760820L; >> public Long vertexJoin(Long vertexValue, Long >> inputValue) { >> return inputValue; >> } >> }); >> >> // Execute LabelPropagation over it. >> DataSet<Vertex<Long, Long>> result = graphWithIDs.run(new >> LabelPropagation<Long, Long, NullValue>(10)); >> >> graph.getVertices().print(); >> >> TimeUnit.SECONDS.sleep(2); >> >> System.out.println("graphWithIDs"); >> graphWithIDs.getVertices().print(); >> graphWithIDs.getEdges().print(); >> >> TimeUnit.SECONDS.sleep(2); >> >> // Group vertices by similar communities. >> final List<Vertex<Long, Long>> collected = result.collect(); >> final HashMap<Long, ArrayList<Long>> commSizes = new >> HashMap<Long, ArrayList<Long>>(); >> for(Vertex<Long, Long> v : collected) { >> //System.out.println("collected[v] = id:" + v.getId() + >> "\tval:" + v.getValue()); >> if(!commSizes.containsKey(v.getValue())) { >> commSizes.put(v.getValue(), new ArrayList<Long>()); >> } >> commSizes.get(v.getValue()).add(v.getId()); >> } >> >> >> System.out.println("#communities:\t" + >> commSizes.keySet().size() + "\n|result|:\t" + result.count() + >> "\n|collected|:\t" + collected.size()); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> } >> } >> >> Thanks for your time, >> >> >> Miguel E. Coimbra >> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >> Skype: miguel.e.coimbra >> >> ---------- Forwarded message ---------- >> >>> From: Fabian Hueske < >>> >>> fhue...@gmail.com> >>> To: user@flink.apache.org >>> Cc: >>> Date: Mon, 5 Dec 2016 08:40:04 +0100 >>> Subject: >>> >>> Re: Thread 'SortMerger spilling thread' terminated due to an exception: >>> No space left on device >>> >>> Hi Miguel, >>> >>> have you found a solution to your problem? >>> I'm not a docker expert but this forum thread looks like could be >>> related to your problem [1]. >>> >>> Best, >>> Fabian >>> >>> [1] https://forums.docker.com/t/no-space-left-on-device-error/10894 >>> >>> 2016-12-02 17:43 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >>> >>>> Hello Fabian, >>>> >>>> I have created a directory on my host machine user directory ( >>>> /home/myuser/mydir ) and I am mapping it as a volume with Docker for the >>>> TaskManager and JobManager containers. >>>> Each container will thus have the following directory /home/flink/htmp >>>> >>>> host ---> container >>>> /home/myuser/mydir ---> /home/flink/htmp >>>> >>>> I had previously done this successfully with the a host directory which >>>> holds several SNAP data sets. >>>> In the Flink configuration file, I specified /home/flink/htmp to be >>>> used as the tmp dir for the TaskManager. >>>> This seems to be working, as I was able to start the cluster and invoke >>>> Flink for that Friendster dataset. >>>> >>>> However, during execution, there were 2 intermediate files which kept >>>> growing until they reached about 30 GB. >>>> At that point, the Flink TaskManager threw the exception again: >>>> >>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread >>>> 'SortMerger spilling thread' terminated due to an exception: No space left >>>> on device >>>> >>>> Here is an ls excerpt of the directory on the host (to which the >>>> TaskManager container was also writing successfully) shortly before the >>>> exception: >>>> >>>> *31G *9d177a1971322263f1597c3378885ccf.channel >>>> *31G* a693811249bc5f785a79d1b1b537fe93.channel >>>> >>>> Now I believe the host system is capable of storing hundred GBs more, >>>> so I am confused as to what the problem might be. >>>> >>>> Best regards, >>>> >>>> Miguel E. Coimbra >>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>> Skype: miguel.e.coimbra >>>> >>>> >>>>> >>>>> Hi Miguel, >>>>> >>>>> the exception does indeed indicate that the process ran out of >>>>> available disk space. >>>>> The quoted paragraph of the blog post describes the situation when you >>>>> receive the IOE. >>>>> >>>>> By default the systems default tmp dir is used. I don't know which >>>>> folder that would be in a Docker setup. >>>>> You can configure the temp dir using the taskmanager.tmp.dirs config >>>>> key. >>>>> Please see the configuration documentation for details [1]. >>>>> >>>>> Hope this helps, >>>>> Fabian >>>>> >>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>> setup/config.html#jobmanager-amp-taskmanager >>>>> >>>>> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >>>>> >>>>> >>>>>> Hello, >>>>>> >>>>>> I have a problem for which I hope someone will be able to give a hint. >>>>>> I am running the Flink *standalone* cluster with 2 Docker containers >>>>>> (1 TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM. >>>>>> >>>>>> The dataset is a large one: SNAP Friendster, which has around 1800 M >>>>>> edges. >>>>>> https://snap.stanford.edu/data/com-Friendster.html >>>>>> >>>>>> I am trying to run the Gelly built-in label propagation algorithm on >>>>>> top of it. >>>>>> As this is a very big dataset, I believe I am exceeding the available >>>>>> RAM and that the system is using secondary storage, which then fails: >>>>>> >>>>>> >>>>>> Connected to JobManager at Actor[akka.tcp://flink@172.19. >>>>>> 0.2:6123/user/jobmanager#894624508] >>>>>> 12/01/2016 17:58:24 Job execution switched to status RUNNING. >>>>>> 12/01/2016 17:58:24 DataSource (at main(App.java:33) ( >>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >>>>>> SCHEDULED >>>>>> 12/01/2016 17:58:24 DataSource (at main(App.java:33) ( >>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >>>>>> DEPLOYING >>>>>> 12/01/2016 17:58:24 DataSource (at main(App.java:33) ( >>>>>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >>>>>> RUNNING >>>>>> 12/01/2016 17:58:24 Map (Map at >>>>>> fromTuple2DataSet(Graph.java:343))(1/1) >>>>>> switched to SCHEDULED >>>>>> 12/01/2016 17:58:24 Map (Map at >>>>>> fromTuple2DataSet(Graph.java:343))(1/1) >>>>>> switched to DEPLOYING >>>>>> 12/01/2016 17:58:24 Map (Map at >>>>>> fromTuple2DataSet(Graph.java:343))(1/1) >>>>>> switched to RUNNING >>>>>> 12/01/2016 17:59:51 Map (Map at >>>>>> fromTuple2DataSet(Graph.java:343))(1/1) >>>>>> switched to FAILED >>>>>> *java.lang.RuntimeException: Error obtaining the sorted input: Thread >>>>>> 'SortMerger spilling thread' terminated due to an exception: No space >>>>>> left >>>>>> on device* >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>> ask.java:1098) >>>>>> at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.j >>>>>> ava:86) >>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>> ava:486) >>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>>> k.java:351) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> *Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>>>> terminated due to an exception: No space left on device* >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ThreadBase.run(UnilateralSortMerger.java:800) >>>>>> Caused by: java.io.IOException: No space left on device >>>>>> at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >>>>>> at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java: >>>>>> 60) >>>>>> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >>>>>> at sun.nio.ch.IOUtil.write(IOUtil.java:65) >>>>>> at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) >>>>>> at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteReque >>>>>> st.write(AsynchronousFileIOChannel.java:344) >>>>>> at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$Wr >>>>>> iterThread.run(IOManagerAsync.java:502) >>>>>> >>>>>> >>>>>> I do not have secondary storage limitations on the host system, so I >>>>>> believe the system would be able to handle whatever is spilled to the >>>>>> disk... >>>>>> Perhaps this is a Docker limitation regarding the usage of the host's >>>>>> secondary storage? >>>>>> >>>>>> Or is there perhaps some configuration or setting for the TaskManager >>>>>> which I am missing? >>>>>> Running the label propagation of Gelly on this dataset and cluster >>>>>> configuration, what would be the expected behavior if the system consumes >>>>>> all the memory? >>>>>> >>>>>> >>>>>> I believe the SortMerger thread is associated to the following >>>>>> mechanism described in this blog post: >>>>>> >>>>>> https://flink.apache.org/news/2015/03/13/peeking-into-Apache >>>>>> -Flinks-Engine-Room.html >>>>>> *The Sort-Merge-Join works by first sorting both input data sets on >>>>>> their join key attributes (Sort Phase) and merging the sorted data sets >>>>>> as >>>>>> a second step (Merge Phase). The sort is done in-memory if the local >>>>>> partition of a data set is small enough. Otherwise, an external >>>>>> merge-sort >>>>>> is done by collecting data until the working memory is filled, sorting >>>>>> it, >>>>>> writing the sorted data to the local filesystem, and starting over by >>>>>> filling the working memory again with more incoming data. After all input >>>>>> data has been received, sorted, and written as sorted runs to the local >>>>>> file system, a fully sorted stream can be obtained. This is done by >>>>>> reading >>>>>> the partially sorted runs from the local filesystem and sort-merging the >>>>>> records on the fly. Once the sorted streams of both inputs are available, >>>>>> both streams are sequentially read and merge-joined in a zig-zag fashion >>>>>> by >>>>>> comparing the sorted join key attributes, building join element pairs for >>>>>> matching keys, and advancing the sorted stream with the lower join key.* >>>>>> >>>>>> I am still investigating the possibility that Docker is at fault >>>>>> regarding secondary storage limitations, but how would I go about >>>>>> estimating the amount of disk space needed for this spilling on this >>>>>> dataset? >>>>>> >>>>>> Thanks for your time, >>>>>> >>>>>> My best regards, >>>>>> >>>>>> Miguel E. Coimbra >>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>>>> Skype: miguel.e.coimbra >>>>> >>>>> >>>> >>> >>> >> >