Hi Miguel,

I'm sorry but AFAIK, the situation has not changed.

Is it possible that you are calling execute() multiple times?
In that case, the 1-st and 2-nd graph would be recomputed before the 3-rd
graph is computed.
That would explain the increasing execution time of 15 seconds.

Best, Fabian

2017-11-26 17:45 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:

> Hello,
>
> I'm facing a problem in an algorithm where I would like to constantly
> update a DataSet representing a graph, perform some computation, output
> one or more DataSink (such as a file on the local system) and then reuse
> the DataSet for a next iteration.
> ​I want to avoid spilling the results to disk at the end of an iteration
> and to read it back in the next iterations - the graph is very big and I do
> not wish to incur that time overhead.
> I want to reuse the full result DataSet of each iteration in the next one
> and I want to save to disk a small percentage of the produced DataSet for
> each iteration.
> The space complexity is rather constant - the number of edges in the graph
> increases by only 100 between iterations (which is an extremely low
> percentage of the original graph's edges) and is obtained using
> env.fromCollection(edgesToAdd).
>
> Although I am using Flink's Gelly API for graphs, I have no problem
> working directly with the underlying vertex and edge DataSet elements.​
>
> Two ways to do this occur to me, but it seems both are currently not
> supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion
> [1]:
>
> «​*​*
>
>
> *Unfortunately, it is not currently possible to output intermediate
> results from a bulk iteration.You can only output the final result at the
> end of the iteration.Also, as you correctly noticed, Flink cannot
> efficiently unroll a while-loop or for-loop, so that won't work either.»*
>
> *1.* I thought I could create a bulk iteration, perform the computation
> and between iterations, output the result to the file system.
> However, this is not possible, as per Vasia's answer, and produces the
> following exception on execution when I try (for example, to calculate a
> centrality metric for every vertex and dump the results to disk), as
> expected based on that information:
>
> org.apache.flink.api.common.InvalidProgramException: A data set that is
> part of an iteration was used as a sink or action. Did you forget to close
> the iteration?
>
> *2.* Using a for loop in my own program and triggering sequential Flink
> job executions.
> Problem: in this scenario, while I am able to use a DataSet produced in
> an iteration's Flink job (and dump the desired output information to disk)
> and pass it to the next Flink job, the computation time increases
> constantly:
> (I also tried manually starting a session which is kept open with
> env.startNewSession() before the loop - no impact)
>
> Initial graph has 33511 vertices and 411578 edges.
> Added 113 vertices and 100 edges.
> 1-th graph now has 33524 vertices and 411678 edges (2.543 s).
> 1-th graph algorithm produced 33524 elements. *(20.96 s)*.
> Added 222 vertices and 200 edges.
> 2-th graph now has 33536 vertices and 411778 edges (1.919 s).
> 2-th graph algorithm produced 33536 elements. *(35.913 s)*.
> Added 326 vertices and 300 edges.
> 3-th graph now has 33543 vertices and 411878 edges (1.825 s).
> 3-th graph algorithm produced 33543 elements. *(49.624 s)*.
> Added 436 vertices and 400 edges.
> 4-th graph now has 33557 vertices and 411978 edges (1.482 s).
> 4-th graph algorithm produced 33557 elements. *(66.209 s)*.
>
> Note that the number of elements in the output DataSet is equal to the
> number of vertices in the graph.
> On iteration i in my program, the executed graph algorithm incorporates
> the result DataSet of iteration i - 1 by means of the
> g.joinWithVertices(previousResultDataSet, new RanksJoinFunction())
> function.
>
> The VertexJoinFunction is a simple forwarding mechanism to set the
> previous values:
>
> @FunctionAnnotation.ForwardedFieldsFirst("*->*")
> private static class RanksJoinFunction implements
> VertexJoinFunction<Double, Double> {
>     @Override
>     public Double vertexJoin(final Double vertexValue, final Double
> inputValue) throws Exception {
>         return inputValue;
>     }
> }
>
> ​I have also used Flink's plan visualizer to check for discrepancies
> between the first iteration and the tenth (for example), but the layout of
> the plan remains exactly the same while the execution time continually
> increases for what should be the same amount of computations.
>
> *Bottom-line:* ​I was hoping someone could tell me how to overcome the
> performance bottleneck using the sequential job approach or enabling the
> output of intermediate results using Flink's Bulk Iterations.
> ​​​I believe others have stumbled upon this limitation before [2, 3].​​
> I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using
> a local environment:
>
> final Configuration conf = new Configuration();
> final LocalEnvironment lenv = (LocalEnvironment)
> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> final ExecutionEnvironment env = lenv;
> env.getConfig().disableSysoutLogging().setParallelism(1);
>
> ​I wish to ​execute in a cluster later on with a bigger dataset, so it
> would be essential that to maximize the ability to reuse the DataSets that
> are distributed by the Flink runtime.
> This would allow me to avoid the performance bottleneck that I described.
> ​Hopefully someone may shed light on this.​
>
> ​Thanks for your attention.​
>
>
> ​References:​
>
> ​[1] https://stackoverflow.com/questions/37224140/possibility-of-
> saving-partial-outputs-from-bulk-iteration-in-flink-
> dataset/37352770#37352770​
>
> ​[2] http://mail-archives.apache.org/mod_mbox/flink-dev/201605.mb
> ox/%3CCY1PR0601MB1228D348B41B274B52BCD3AB94450@
> CY1PR0601MB1228.namprd06.prod.outlook.com%3E​
>
> ​[3] http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/Intermediate-output-during-delta-iterations-td436.html​
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
> Skype: miguel.e.coimbra
>

Reply via email to