Hi Miguel, I'm sorry but AFAIK, the situation has not changed.
Is it possible that you are calling execute() multiple times? In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd graph is computed. That would explain the increasing execution time of 15 seconds. Best, Fabian 2017-11-26 17:45 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: > Hello, > > I'm facing a problem in an algorithm where I would like to constantly > update a DataSet representing a graph, perform some computation, output > one or more DataSink (such as a file on the local system) and then reuse > the DataSet for a next iteration. > I want to avoid spilling the results to disk at the end of an iteration > and to read it back in the next iterations - the graph is very big and I do > not wish to incur that time overhead. > I want to reuse the full result DataSet of each iteration in the next one > and I want to save to disk a small percentage of the produced DataSet for > each iteration. > The space complexity is rather constant - the number of edges in the graph > increases by only 100 between iterations (which is an extremely low > percentage of the original graph's edges) and is obtained using > env.fromCollection(edgesToAdd). > > Although I am using Flink's Gelly API for graphs, I have no problem > working directly with the underlying vertex and edge DataSet elements. > > Two ways to do this occur to me, but it seems both are currently not > supported in Flink, as per Vasia's answer to this Stack Overflow question > [1]: > > «** > > > *Unfortunately, it is not currently possible to output intermediate > results from a bulk iteration.You can only output the final result at the > end of the iteration.Also, as you correctly noticed, Flink cannot > efficiently unroll a while-loop or for-loop, so that won't work either.»* > > *1.* I thought I could create a bulk iteration, perform the computation > and between iterations, output the result to the file system. > However, this is not possible, as per Vasia's answer, and produces the > following exception on execution when I try (for example, to calculate a > centrality metric for every vertex and dump the results to disk), as > expected based on that information: > > org.apache.flink.api.common.InvalidProgramException: A data set that is > part of an iteration was used as a sink or action. Did you forget to close > the iteration? > > *2.* Using a for loop in my own program and triggering sequential Flink > job executions. > Problem: in this scenario, while I am able to use a DataSet produced in > an iteration's Flink job (and dump the desired output information to disk) > and pass it to the next Flink job, the computation time increases > constantly: > (I also tried manually starting a session which is kept open with > env.startNewSession() before the loop - no impact) > > Initial graph has 33511 vertices and 411578 edges. > Added 113 vertices and 100 edges. > 1-th graph now has 33524 vertices and 411678 edges (2.543 s). > 1-th graph algorithm produced 33524 elements. *(20.96 s)*. > Added 222 vertices and 200 edges. > 2-th graph now has 33536 vertices and 411778 edges (1.919 s). > 2-th graph algorithm produced 33536 elements. *(35.913 s)*. > Added 326 vertices and 300 edges. > 3-th graph now has 33543 vertices and 411878 edges (1.825 s). > 3-th graph algorithm produced 33543 elements. *(49.624 s)*. > Added 436 vertices and 400 edges. > 4-th graph now has 33557 vertices and 411978 edges (1.482 s). > 4-th graph algorithm produced 33557 elements. *(66.209 s)*. > > Note that the number of elements in the output DataSet is equal to the > number of vertices in the graph. > On iteration i in my program, the executed graph algorithm incorporates > the result DataSet of iteration i - 1 by means of the > g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) > function. > > The VertexJoinFunction is a simple forwarding mechanism to set the > previous values: > > @FunctionAnnotation.ForwardedFieldsFirst("*->*") > private static class RanksJoinFunction implements > VertexJoinFunction<Double, Double> { > @Override > public Double vertexJoin(final Double vertexValue, final Double > inputValue) throws Exception { > return inputValue; > } > } > > I have also used Flink's plan visualizer to check for discrepancies > between the first iteration and the tenth (for example), but the layout of > the plan remains exactly the same while the execution time continually > increases for what should be the same amount of computations. > > *Bottom-line:* I was hoping someone could tell me how to overcome the > performance bottleneck using the sequential job approach or enabling the > output of intermediate results using Flink's Bulk Iterations. > I believe others have stumbled upon this limitation before [2, 3]. > I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using > a local environment: > > final Configuration conf = new Configuration(); > final LocalEnvironment lenv = (LocalEnvironment) > ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); > final ExecutionEnvironment env = lenv; > env.getConfig().disableSysoutLogging().setParallelism(1); > > I wish to execute in a cluster later on with a bigger dataset, so it > would be essential that to maximize the ability to reuse the DataSets that > are distributed by the Flink runtime. > This would allow me to avoid the performance bottleneck that I described. > Hopefully someone may shed light on this. > > Thanks for your attention. > > > References: > > [1] https://stackoverflow.com/questions/37224140/possibility-of- > saving-partial-outputs-from-bulk-iteration-in-flink- > dataset/37352770#37352770 > > [2] http://mail-archives.apache.org/mod_mbox/flink-dev/201605.mb > ox/%3CCY1PR0601MB1228D348B41B274B52BCD3AB94450@ > CY1PR0601MB1228.namprd06.prod.outlook.com%3E > > [3] http://apache-flink-user-mailing-list-archive.2336050.n4. > nabble.com/Intermediate-output-during-delta-iterations-td436.html > > Miguel E. Coimbra > Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> > Skype: miguel.e.coimbra >