Hello Fabian,

After increasing the message size akka parameter, the client resulted in
the following exception after some time.
This confirms that the JobManager never received the job request:

[WARNING]
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at
org.apache.flink.optimizer.costs.CostEstimator.costOperator(CostEstimator.java:78)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:516)
        at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at
org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:344)
        at
org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
        at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
        at
org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:193)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:496)
        at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
        at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:349)
        at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:812)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
        at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
        at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
        at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
        at
pt.ulisboa.tecnico.graph.util.GraphSequenceTest.optimizedAccumulationLoop(GraphSequenceTest.java:304)
        at
pt.ulisboa.tecnico.graph.util.GraphSequenceTest.main(GraphSequenceTest.java:204)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

I hope I am not hitting a formal limit of Flink?

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

On 6 December 2017 at 08:48, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Miguel,
>
> if the message size would be the problem, the client should fail with an
> exception.
> What might happen, is that the client gets stuck while optimizing the
> program.
>
> You could take a stacktrace of the client process to identify at which
> part the client gets stuck.
>
> Best, Fabian
>
> 2017-12-06 3:01 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Hello Fabian,
>>
>> Thanks for the help.
>> I am interested in the duration of specific operators, so the fact that
>> parts of the execution are in pipeline is not a problem for me.
>> From my understanding, the automated way to approach this is to run the
>> Flink job with the web interface active and then make a REST call on the
>> appropriate job and parse the JSON.
>> I have implemented that and my program is able to read the duration of
>> specific operators.
>>
>> However, I'm facing another problem whose cause I haven't been able to
>> pinpoint.
>>
>> Testing on 1.4.0-SNAPSHOT.
>>
>> When launching a cluster (start-local.bat on Windows or start-cluster.sh
>> on Linux), the JobManager, the TaskManager(s) are launched and the web
>> front end becomes active (I can access it via browser) - everything is ok
>> so far.
>> The problem occurs when the number of operators in the plan increases.
>>
>> Consider that I execute my algorithm three (3) times through a single
>> Flink plan.
>> For three times, vertices and edges will be added to the graph (via Gelly
>> methods).
>> This is defined in a for loop in my program. For each iteration:
>>
>> // I add 100 edges to the graph, decomposed in a list of vertices and
>> edges
>> final Graph<Long, NullValue, NullValue> newGraph = graph
>>     .addVertices(verticesToAdd)
>>     .addEdges(edgesToAdd);
>>
>> // Generate identifications for the vertex counter.
>> final String vid = new AbstractID().toString();
>> newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long,
>> NullValue>>(vid)).name("count()");
>> vids.put(executionCounter, vid);
>>
>> // Generate identifications for the edge counter.
>> final String eid = new AbstractID().toString();
>> newGraph.getEdges().output(new Utils.CountHelper<Edge<Long,
>> NullValue>>(eid)).name("count()");
>> eids.put(executionCounter, eid);
>>
>> So far I have created 2 sinks in the current iteration in my regular Java
>> program.
>> Then I execute my graph algorithm with the previous results:
>>
>> // Execute the graph algorithm and output some of its results
>> result = newGraph.run(new MyGraphAlgorithm<>(previousResults));
>> result.sortPartition(1, Order.DESCENDING).setParalleli
>> sm(1).first(1000).output(outputFormat).name("Store results");
>> previousResults = result;
>>
>> This creates one additional sink via the output function.
>> So for three (3) iterations, I define nine (9) sinks in the plan, call
>> execute() and afterward retrieve the contents of the sinks.
>> This works fine so far.
>>
>> If I run with 10 iterations, I will be creating 30 sinks.
>> The problem is that for 10 iterations, the Flink client program just
>> hangs on the execute() call forever (execution time should increase
>> linearly with the number of iterations).
>> For 3 iterations, execute() proceeds normally and it takes around 20
>> seconds per iteration, so 3 iterations is 60 seconds and 10 should be
>> around 3 minutes.
>> After five hours, there was no progress.
>>
>> Furthermore, I checked the web monitor periodically and there was not a
>> single job.
>> It seems that the client program is simply not sending the job to the
>> cluster if the job plan becomes too big.
>> The exact same compiled program, with 3 iterations (via argument) works,
>> but with 10 (via argument) it simply falls silent.
>>
>> I am trying to understand what may be the problem:
>>
>> - An internal limit in the number of datasinks or operators in the plan?
>>
>> - A limit in message size preventing the client from sending the job?
>> (see: https://issues.apache.org/jira/browse/FLINK-2603 )
>> I have tried increasing the akka.framesize to 256000kB in the Flink
>> server flink-conf.yaml config and in the client program when creating
>> the remote environment with:
>>
>> Configuration clientConfig = new Configuration();
>> final String akkaFrameSize = "256000kB";
>> ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions.
>> FRAMESIZE.key()).defaultValue(akkaFrameSize);
>> clientConfig.setString(akkaConfig, akkaFrameSize);
>> env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig,
>> jarFiles, null);
>>
>> I have run out of ideas with respect to the causes.
>> Hoping you may be able to shed some light.
>>
>> Thank you for your time,
>>
>> Best regards,
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>> Skype: miguel.e.coimbra
>>
>> On 29 November 2017 at 10:23, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> The monitoring REST interface provides detailed stats about a job, its
>>> tasks, and processing verticies including their start and end time [1].
>>>
>>> However, it is not trivial to make sense of the execution times because
>>> Flink uses pipelined shuffles by default.
>>> That means that the execution of multiple operators can overlap. For
>>> example the records that are produced by a GroupReduce can be processed by
>>> a Map, shuffled, and sorted (for another GroupReduce) in a pipelined
>>> fashion.
>>> Hence, all these operations run at the same time. You can disable this
>>> behavior to some extend by setting the execution mode to batched shuffles
>>> [2].
>>> However, this will likely have a negative impact on the overall
>>> execution time.
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> monitoring/rest_api.html#details-of-a-running-or-completed-job
>>> [2] https://stackoverflow.com/questions/33691612/apache-flink-st
>>> epwise-execution/33691957#33691957
>>>
>>>
>>>
>>> 2017-11-29 0:44 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>>
>>>> Hello,
>>>>
>>>> You're right, I was overlooking that.
>>>> With your suggestion, I now just define a different sink in each
>>>> iteration of the loop.
>>>> Then they all output to disk when executing a single bigger plan.
>>>>
>>>> I have one more question: I know I can retrieve the total time this
>>>> single job takes to execute, but what if I want to know the time taken for
>>>> specific operators in the dag?
>>>> Is there some functionality in the Flink Batch API akin to counting
>>>> elements but for measuring time instead?
>>>> For example, if I am not mistaken, an operator can be executed in
>>>> parallel or serially (with a parallelism of one).
>>>>
>>>> Is there a straightforward way to get the time taken by the operator's
>>>> tasks?
>>>> In a way that I could:
>>>>
>>>> a) just get the time of a single task (if running serially) to get the
>>>> total operator execution time;
>>>> b) know the time taken by each parallel component of the operator's
>>>> execution so I could know where and what was the "lagging element" in the
>>>> operator's execution.
>>>>
>>>> Is this possible? I was hoping I could retrieve this information in the
>>>> Java program itself and avoid processing logs.
>>>>
>>>> Thanks again.
>>>>
>>>> Best regards,
>>>>
>>>>
>>>> Miguel E. Coimbra
>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>> Skype: miguel.e.coimbra
>>>>
>>>> On 28 November 2017 at 08:56, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> by calling result.count(), you compute the complete plan from the
>>>>> beginning and not just the operations you added since the last execution.
>>>>> Looking at the output you posted, each step takes about 15 seconds
>>>>> (with about 5 secs of initialization).
>>>>> So the 20 seconds of the first step include initialization + 1st step.
>>>>> The 35 seconds on the second step include initialization, 1st step +
>>>>> 2nd step.
>>>>> If you don't call count on the intermediate steps, you can compute the
>>>>> 4th step in 65 seconds.
>>>>>
>>>>> Implementing a caching operator would be a pretty huge effort because
>>>>> you need to touch code at many places such as the API, optimizer, runtime,
>>>>> scheduling, etc.
>>>>> The documentation you found should still be applicable. There hasn't
>>>>> been major additions to the DataSet API and runtime in the last releases.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>>
>>>>>
>>>>> 2017-11-28 9:14 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>>>>
>>>>>> Hello Fabian,
>>>>>>
>>>>>> Thank you for the reply.
>>>>>> I was hoping the situation had in fact changed.
>>>>>>
>>>>>> As far as I know, I am not calling execute() directly even once - it
>>>>>> is being called implicitly by simple DataSink elements added to the
>>>>>> plan through count():
>>>>>>
>>>>>> System.out.println(String.format("%d-th graph algorithm produced %d
>>>>>> elements. (%d.%d s).",
>>>>>>                             executionCounter,
>>>>>>                             *result.count()*, // this would trigger
>>>>>> execution...
>>>>>>                             env.getLastJobExecutionResult(
>>>>>> ).getNetRuntime(TimeUnit.SECONDS),
>>>>>>                             env.getLastJobExecutionResult(
>>>>>> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));
>>>>>>
>>>>>>
>>>>>> I have taken a look at Flink's code base (e.g. how the dataflow dag
>>>>>> is processed with classes such as  
>>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
>>>>>> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not
>>>>>> sure on the most direct way to achieve this.
>>>>>> Perhaps I missed some online documentation that would help to get a
>>>>>> grip on how to contribute to the different parts of Flink?
>>>>>>
>>>>>> I did find some information which hints at implementing this sort of
>>>>>> thing (such as adding custom operators) but it was associated to an old
>>>>>> version of Flink:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>> internals/add_operator.html
>>>>>> However, as far as I know there is no equivalent page in the current
>>>>>> online stable or snapshot documentation.
>>>>>>
>>>>>> What would be the best way to go about this?
>>>>>>
>>>>>> It really seems that the DataSet stored in the result variable is
>>>>>> always representing an increasing sequence of executions and not just the
>>>>>> results of the last execution.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Miguel E. Coimbra
>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>>> Skype: miguel.e.coimbra
>>>>>>
>>>>>> On 27 November 2017 at 22:56, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Miguel,
>>>>>>>
>>>>>>> I'm sorry but AFAIK, the situation has not changed.
>>>>>>>
>>>>>>> Is it possible that you are calling execute() multiple times?
>>>>>>> In that case, the 1-st and 2-nd graph would be recomputed before the
>>>>>>> 3-rd graph is computed.
>>>>>>> That would explain the increasing execution time of 15 seconds.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2017-11-26 17:45 GMT+01:00 Miguel Coimbra <
>>>>>>> miguel.e.coim...@gmail.com>:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I'm facing a problem in an algorithm where I would like to
>>>>>>>> constantly update a DataSet representing a graph, perform some
>>>>>>>> computation, output one or more DataSink (such as a file on the
>>>>>>>> local system) and then reuse the DataSet for a next iteration.
>>>>>>>> ​I want to avoid spilling the results to disk at the end of an
>>>>>>>> iteration and to read it back in the next iterations - the graph is 
>>>>>>>> very
>>>>>>>> big and I do not wish to incur that time overhead.
>>>>>>>> I want to reuse the full result DataSet of each iteration in the
>>>>>>>> next one and I want to save to disk a small percentage of the produced
>>>>>>>> DataSet for each iteration.
>>>>>>>> The space complexity is rather constant - the number of edges in
>>>>>>>> the graph increases by only 100 between iterations (which is an 
>>>>>>>> extremely
>>>>>>>> low percentage of the original graph's edges) and is obtained using
>>>>>>>> env.fromCollection(edgesToAdd).
>>>>>>>>
>>>>>>>> Although I am using Flink's Gelly API for graphs, I have no problem
>>>>>>>> working directly with the underlying vertex and edge DataSet
>>>>>>>> elements.​
>>>>>>>>
>>>>>>>> Two ways to do this occur to me, but it seems both are currently
>>>>>>>> not supported in Flink, as per Vasia's answer to this Stack Overflow
>>>>>>>> quest​ion [1]:
>>>>>>>>
>>>>>>>> «​*​*
>>>>>>>>
>>>>>>>>
>>>>>>>> *Unfortunately, it is not currently possible to output intermediate
>>>>>>>> results from a bulk iteration.You can only output the final result at 
>>>>>>>> the
>>>>>>>> end of the iteration.Also, as you correctly noticed, Flink cannot
>>>>>>>> efficiently unroll a while-loop or for-loop, so that won't work 
>>>>>>>> either.»*
>>>>>>>>
>>>>>>>> *1.* I thought I could create a bulk iteration, perform the
>>>>>>>> computation and between iterations, output the result to the file 
>>>>>>>> system.
>>>>>>>> However, this is not possible, as per Vasia's answer, and produces
>>>>>>>> the following exception on execution when I try (for example, to 
>>>>>>>> calculate
>>>>>>>> a centrality metric for every vertex and dump the results to disk), as
>>>>>>>> expected based on that information:
>>>>>>>>
>>>>>>>> org.apache.flink.api.common.InvalidProgramException: A data set
>>>>>>>> that is part of an iteration was used as a sink or action. Did you 
>>>>>>>> forget
>>>>>>>> to close the iteration?
>>>>>>>>
>>>>>>>> *2.* Using a for loop in my own program and triggering sequential
>>>>>>>> Flink job executions.
>>>>>>>> Problem: in this scenario, while I am able to use a DataSet produced
>>>>>>>> in an iteration's Flink job (and dump the desired output information to
>>>>>>>> disk) and pass it to the next Flink job, the computation time increases
>>>>>>>> constantly:
>>>>>>>> (I also tried manually starting a session which is kept open with
>>>>>>>> env.startNewSession() before the loop - no impact)
>>>>>>>>
>>>>>>>> ​​
>>>>>>>> Initial graph has 33511 vertices and 411578 edges.
>>>>>>>> Added 113 vertices and 100 edges.
>>>>>>>> 1-th graph now has 33524 vertices and 411678 edges (2.543 s).
>>>>>>>> 1-th graph algorithm produced 33524 elements. *(20.96 s)*.
>>>>>>>> Added 222 vertices and 200 edges.
>>>>>>>> 2-th graph now has 33536 vertices and 411778 edges (1.919 s).
>>>>>>>> 2-th graph algorithm produced 33536 elements. *(35.913 s)*.
>>>>>>>> Added 326 vertices and 300 edges.
>>>>>>>> 3-th graph now has 33543 vertices and 411878 edges (1.825 s).
>>>>>>>> 3-th graph algorithm produced 33543 elements. *(49.624 s)*.
>>>>>>>> Added 436 vertices and 400 edges.
>>>>>>>> 4-th graph now has 33557 vertices and 411978 edges (1.482 s).
>>>>>>>> 4-th graph algorithm produced 33557 elements. *(66.209 s)*.
>>>>>>>>
>>>>>>>> Note that the number of elements in the output DataSet is equal to
>>>>>>>> the number of vertices in the graph.
>>>>>>>> On iteration i in my program, the executed graph algorithm
>>>>>>>> incorporates the result DataSet of iteration i - 1 by means of the
>>>>>>>> g.joinWithVertices(previousResultDataSet, new RanksJoinFunction())
>>>>>>>> function.
>>>>>>>>
>>>>>>>> The VertexJoinFunction is a simple forwarding mechanism to set the
>>>>>>>> previous values:
>>>>>>>>
>>>>>>>> @FunctionAnnotation.ForwardedFieldsFirst("*->*")
>>>>>>>> private static class RanksJoinFunction implements
>>>>>>>> VertexJoinFunction<Double, Double> {
>>>>>>>>     @Override
>>>>>>>>     public Double vertexJoin(final Double vertexValue, final Double
>>>>>>>> inputValue) throws Exception {
>>>>>>>>         return inputValue;
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> ​I have also used Flink's plan visualizer to check for
>>>>>>>> discrepancies between the first iteration and the tenth (for example), 
>>>>>>>> but
>>>>>>>> the layout of the plan remains exactly the same while the execution 
>>>>>>>> time
>>>>>>>> continually increases for what should be the same amount of 
>>>>>>>> computations.
>>>>>>>>
>>>>>>>> *Bottom-line:* ​I was hoping someone could tell me how to overcome
>>>>>>>> the performance bottleneck using the sequential job approach or 
>>>>>>>> enabling
>>>>>>>> the output of intermediate results using Flink's Bulk Iterations.
>>>>>>>> ​​​I believe others have stumbled upon this limitation before [2,
>>>>>>>> 3].​​
>>>>>>>> I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit
>>>>>>>> using a local environment:
>>>>>>>>
>>>>>>>> final Configuration conf = new Configuration();
>>>>>>>> final LocalEnvironment lenv = (LocalEnvironment)
>>>>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>>>>> final ExecutionEnvironment env = lenv;
>>>>>>>> env.getConfig().disableSysoutLogging().setParallelism(1);
>>>>>>>>
>>>>>>>> ​I wish to ​execute in a cluster later on with a bigger dataset, so
>>>>>>>> it would be essential that to maximize the ability to reuse the 
>>>>>>>> DataSets
>>>>>>>> that are distributed by the Flink runtime.
>>>>>>>> This would allow me to avoid the performance bottleneck that I
>>>>>>>> described.
>>>>>>>> ​Hopefully someone may shed light on this.​
>>>>>>>>
>>>>>>>> ​Thanks for your attention.​
>>>>>>>>
>>>>>>>>
>>>>>>>> ​References:​
>>>>>>>>
>>>>>>>> ​[1] https://stackoverflow.com/questions/37224140/possibility-of-
>>>>>>>> saving-partial-outputs-from-bulk-iteration-in-flink-dataset/
>>>>>>>> 37352770#37352770​
>>>>>>>>
>>>>>>>> ​[2] http://mail-archives.apache.org/mod_mbox/flink-dev/201605.mb
>>>>>>>> ox/%3CCY1PR0601MB1228D348B41B274B52BCD3AB94450@CY1PR0601MB12
>>>>>>>> 28.namprd06.prod.outlook.com%3E​
>>>>>>>>
>>>>>>>> ​[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>>>> ble.com/Intermediate-output-during-delta-iterations-td436.html​
>>>>>>>>
>>>>>>>> Miguel E. Coimbra
>>>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>>>>> Skype: miguel.e.coimbra
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to