Hello Fabian, After increasing the message size akka parameter, the client resulted in the following exception after some time. This confirms that the JobManager never received the job request:
[WARNING] java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.flink.optimizer.costs.CostEstimator.costOperator(CostEstimator.java:78) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:516) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:344) at org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:193) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:496) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:349) at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:812) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172) at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.optimizedAccumulationLoop(GraphSequenceTest.java:304) at pt.ulisboa.tecnico.graph.util.GraphSequenceTest.main(GraphSequenceTest.java:204) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I hope I am not hitting a formal limit of Flink? Miguel E. Coimbra Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> Skype: miguel.e.coimbra On 6 December 2017 at 08:48, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Miguel, > > if the message size would be the problem, the client should fail with an > exception. > What might happen, is that the client gets stuck while optimizing the > program. > > You could take a stacktrace of the client process to identify at which > part the client gets stuck. > > Best, Fabian > > 2017-12-06 3:01 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: > >> Hello Fabian, >> >> Thanks for the help. >> I am interested in the duration of specific operators, so the fact that >> parts of the execution are in pipeline is not a problem for me. >> From my understanding, the automated way to approach this is to run the >> Flink job with the web interface active and then make a REST call on the >> appropriate job and parse the JSON. >> I have implemented that and my program is able to read the duration of >> specific operators. >> >> However, I'm facing another problem whose cause I haven't been able to >> pinpoint. >> >> Testing on 1.4.0-SNAPSHOT. >> >> When launching a cluster (start-local.bat on Windows or start-cluster.sh >> on Linux), the JobManager, the TaskManager(s) are launched and the web >> front end becomes active (I can access it via browser) - everything is ok >> so far. >> The problem occurs when the number of operators in the plan increases. >> >> Consider that I execute my algorithm three (3) times through a single >> Flink plan. >> For three times, vertices and edges will be added to the graph (via Gelly >> methods). >> This is defined in a for loop in my program. For each iteration: >> >> // I add 100 edges to the graph, decomposed in a list of vertices and >> edges >> final Graph<Long, NullValue, NullValue> newGraph = graph >> .addVertices(verticesToAdd) >> .addEdges(edgesToAdd); >> >> // Generate identifications for the vertex counter. >> final String vid = new AbstractID().toString(); >> newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long, >> NullValue>>(vid)).name("count()"); >> vids.put(executionCounter, vid); >> >> // Generate identifications for the edge counter. >> final String eid = new AbstractID().toString(); >> newGraph.getEdges().output(new Utils.CountHelper<Edge<Long, >> NullValue>>(eid)).name("count()"); >> eids.put(executionCounter, eid); >> >> So far I have created 2 sinks in the current iteration in my regular Java >> program. >> Then I execute my graph algorithm with the previous results: >> >> // Execute the graph algorithm and output some of its results >> result = newGraph.run(new MyGraphAlgorithm<>(previousResults)); >> result.sortPartition(1, Order.DESCENDING).setParalleli >> sm(1).first(1000).output(outputFormat).name("Store results"); >> previousResults = result; >> >> This creates one additional sink via the output function. >> So for three (3) iterations, I define nine (9) sinks in the plan, call >> execute() and afterward retrieve the contents of the sinks. >> This works fine so far. >> >> If I run with 10 iterations, I will be creating 30 sinks. >> The problem is that for 10 iterations, the Flink client program just >> hangs on the execute() call forever (execution time should increase >> linearly with the number of iterations). >> For 3 iterations, execute() proceeds normally and it takes around 20 >> seconds per iteration, so 3 iterations is 60 seconds and 10 should be >> around 3 minutes. >> After five hours, there was no progress. >> >> Furthermore, I checked the web monitor periodically and there was not a >> single job. >> It seems that the client program is simply not sending the job to the >> cluster if the job plan becomes too big. >> The exact same compiled program, with 3 iterations (via argument) works, >> but with 10 (via argument) it simply falls silent. >> >> I am trying to understand what may be the problem: >> >> - An internal limit in the number of datasinks or operators in the plan? >> >> - A limit in message size preventing the client from sending the job? >> (see: https://issues.apache.org/jira/browse/FLINK-2603 ) >> I have tried increasing the akka.framesize to 256000kB in the Flink >> server flink-conf.yaml config and in the client program when creating >> the remote environment with: >> >> Configuration clientConfig = new Configuration(); >> final String akkaFrameSize = "256000kB"; >> ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions. >> FRAMESIZE.key()).defaultValue(akkaFrameSize); >> clientConfig.setString(akkaConfig, akkaFrameSize); >> env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig, >> jarFiles, null); >> >> I have run out of ideas with respect to the causes. >> Hoping you may be able to shed some light. >> >> Thank you for your time, >> >> Best regards, >> >> Miguel E. Coimbra >> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >> Skype: miguel.e.coimbra >> >> On 29 November 2017 at 10:23, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> The monitoring REST interface provides detailed stats about a job, its >>> tasks, and processing verticies including their start and end time [1]. >>> >>> However, it is not trivial to make sense of the execution times because >>> Flink uses pipelined shuffles by default. >>> That means that the execution of multiple operators can overlap. For >>> example the records that are produced by a GroupReduce can be processed by >>> a Map, shuffled, and sorted (for another GroupReduce) in a pipelined >>> fashion. >>> Hence, all these operations run at the same time. You can disable this >>> behavior to some extend by setting the execution mode to batched shuffles >>> [2]. >>> However, this will likely have a negative impact on the overall >>> execution time. >>> >>> Best, Fabian >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >>> monitoring/rest_api.html#details-of-a-running-or-completed-job >>> [2] https://stackoverflow.com/questions/33691612/apache-flink-st >>> epwise-execution/33691957#33691957 >>> >>> >>> >>> 2017-11-29 0:44 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >>> >>>> Hello, >>>> >>>> You're right, I was overlooking that. >>>> With your suggestion, I now just define a different sink in each >>>> iteration of the loop. >>>> Then they all output to disk when executing a single bigger plan. >>>> >>>> I have one more question: I know I can retrieve the total time this >>>> single job takes to execute, but what if I want to know the time taken for >>>> specific operators in the dag? >>>> Is there some functionality in the Flink Batch API akin to counting >>>> elements but for measuring time instead? >>>> For example, if I am not mistaken, an operator can be executed in >>>> parallel or serially (with a parallelism of one). >>>> >>>> Is there a straightforward way to get the time taken by the operator's >>>> tasks? >>>> In a way that I could: >>>> >>>> a) just get the time of a single task (if running serially) to get the >>>> total operator execution time; >>>> b) know the time taken by each parallel component of the operator's >>>> execution so I could know where and what was the "lagging element" in the >>>> operator's execution. >>>> >>>> Is this possible? I was hoping I could retrieve this information in the >>>> Java program itself and avoid processing logs. >>>> >>>> Thanks again. >>>> >>>> Best regards, >>>> >>>> >>>> Miguel E. Coimbra >>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>> Skype: miguel.e.coimbra >>>> >>>> On 28 November 2017 at 08:56, Fabian Hueske <fhue...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> by calling result.count(), you compute the complete plan from the >>>>> beginning and not just the operations you added since the last execution. >>>>> Looking at the output you posted, each step takes about 15 seconds >>>>> (with about 5 secs of initialization). >>>>> So the 20 seconds of the first step include initialization + 1st step. >>>>> The 35 seconds on the second step include initialization, 1st step + >>>>> 2nd step. >>>>> If you don't call count on the intermediate steps, you can compute the >>>>> 4th step in 65 seconds. >>>>> >>>>> Implementing a caching operator would be a pretty huge effort because >>>>> you need to touch code at many places such as the API, optimizer, runtime, >>>>> scheduling, etc. >>>>> The documentation you found should still be applicable. There hasn't >>>>> been major additions to the DataSet API and runtime in the last releases. >>>>> >>>>> Best, Fabian >>>>> >>>>> >>>>> >>>>> 2017-11-28 9:14 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >>>>> >>>>>> Hello Fabian, >>>>>> >>>>>> Thank you for the reply. >>>>>> I was hoping the situation had in fact changed. >>>>>> >>>>>> As far as I know, I am not calling execute() directly even once - it >>>>>> is being called implicitly by simple DataSink elements added to the >>>>>> plan through count(): >>>>>> >>>>>> System.out.println(String.format("%d-th graph algorithm produced %d >>>>>> elements. (%d.%d s).", >>>>>> executionCounter, >>>>>> *result.count()*, // this would trigger >>>>>> execution... >>>>>> env.getLastJobExecutionResult( >>>>>> ).getNetRuntime(TimeUnit.SECONDS), >>>>>> env.getLastJobExecutionResult( >>>>>> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000)); >>>>>> >>>>>> >>>>>> I have taken a look at Flink's code base (e.g. how the dataflow dag >>>>>> is processed with classes such as >>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor, >>>>>> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not >>>>>> sure on the most direct way to achieve this. >>>>>> Perhaps I missed some online documentation that would help to get a >>>>>> grip on how to contribute to the different parts of Flink? >>>>>> >>>>>> I did find some information which hints at implementing this sort of >>>>>> thing (such as adding custom operators) but it was associated to an old >>>>>> version of Flink: >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>>> internals/add_operator.html >>>>>> However, as far as I know there is no equivalent page in the current >>>>>> online stable or snapshot documentation. >>>>>> >>>>>> What would be the best way to go about this? >>>>>> >>>>>> It really seems that the DataSet stored in the result variable is >>>>>> always representing an increasing sequence of executions and not just the >>>>>> results of the last execution. >>>>>> >>>>>> >>>>>> >>>>>> Best regards, >>>>>> >>>>>> Miguel E. Coimbra >>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>>>> Skype: miguel.e.coimbra >>>>>> >>>>>> On 27 November 2017 at 22:56, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Miguel, >>>>>>> >>>>>>> I'm sorry but AFAIK, the situation has not changed. >>>>>>> >>>>>>> Is it possible that you are calling execute() multiple times? >>>>>>> In that case, the 1-st and 2-nd graph would be recomputed before the >>>>>>> 3-rd graph is computed. >>>>>>> That would explain the increasing execution time of 15 seconds. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> 2017-11-26 17:45 GMT+01:00 Miguel Coimbra < >>>>>>> miguel.e.coim...@gmail.com>: >>>>>>> >>>>>>>> Hello, >>>>>>>> >>>>>>>> I'm facing a problem in an algorithm where I would like to >>>>>>>> constantly update a DataSet representing a graph, perform some >>>>>>>> computation, output one or more DataSink (such as a file on the >>>>>>>> local system) and then reuse the DataSet for a next iteration. >>>>>>>> I want to avoid spilling the results to disk at the end of an >>>>>>>> iteration and to read it back in the next iterations - the graph is >>>>>>>> very >>>>>>>> big and I do not wish to incur that time overhead. >>>>>>>> I want to reuse the full result DataSet of each iteration in the >>>>>>>> next one and I want to save to disk a small percentage of the produced >>>>>>>> DataSet for each iteration. >>>>>>>> The space complexity is rather constant - the number of edges in >>>>>>>> the graph increases by only 100 between iterations (which is an >>>>>>>> extremely >>>>>>>> low percentage of the original graph's edges) and is obtained using >>>>>>>> env.fromCollection(edgesToAdd). >>>>>>>> >>>>>>>> Although I am using Flink's Gelly API for graphs, I have no problem >>>>>>>> working directly with the underlying vertex and edge DataSet >>>>>>>> elements. >>>>>>>> >>>>>>>> Two ways to do this occur to me, but it seems both are currently >>>>>>>> not supported in Flink, as per Vasia's answer to this Stack Overflow >>>>>>>> question [1]: >>>>>>>> >>>>>>>> «** >>>>>>>> >>>>>>>> >>>>>>>> *Unfortunately, it is not currently possible to output intermediate >>>>>>>> results from a bulk iteration.You can only output the final result at >>>>>>>> the >>>>>>>> end of the iteration.Also, as you correctly noticed, Flink cannot >>>>>>>> efficiently unroll a while-loop or for-loop, so that won't work >>>>>>>> either.»* >>>>>>>> >>>>>>>> *1.* I thought I could create a bulk iteration, perform the >>>>>>>> computation and between iterations, output the result to the file >>>>>>>> system. >>>>>>>> However, this is not possible, as per Vasia's answer, and produces >>>>>>>> the following exception on execution when I try (for example, to >>>>>>>> calculate >>>>>>>> a centrality metric for every vertex and dump the results to disk), as >>>>>>>> expected based on that information: >>>>>>>> >>>>>>>> org.apache.flink.api.common.InvalidProgramException: A data set >>>>>>>> that is part of an iteration was used as a sink or action. Did you >>>>>>>> forget >>>>>>>> to close the iteration? >>>>>>>> >>>>>>>> *2.* Using a for loop in my own program and triggering sequential >>>>>>>> Flink job executions. >>>>>>>> Problem: in this scenario, while I am able to use a DataSet produced >>>>>>>> in an iteration's Flink job (and dump the desired output information to >>>>>>>> disk) and pass it to the next Flink job, the computation time increases >>>>>>>> constantly: >>>>>>>> (I also tried manually starting a session which is kept open with >>>>>>>> env.startNewSession() before the loop - no impact) >>>>>>>> >>>>>>>> >>>>>>>> Initial graph has 33511 vertices and 411578 edges. >>>>>>>> Added 113 vertices and 100 edges. >>>>>>>> 1-th graph now has 33524 vertices and 411678 edges (2.543 s). >>>>>>>> 1-th graph algorithm produced 33524 elements. *(20.96 s)*. >>>>>>>> Added 222 vertices and 200 edges. >>>>>>>> 2-th graph now has 33536 vertices and 411778 edges (1.919 s). >>>>>>>> 2-th graph algorithm produced 33536 elements. *(35.913 s)*. >>>>>>>> Added 326 vertices and 300 edges. >>>>>>>> 3-th graph now has 33543 vertices and 411878 edges (1.825 s). >>>>>>>> 3-th graph algorithm produced 33543 elements. *(49.624 s)*. >>>>>>>> Added 436 vertices and 400 edges. >>>>>>>> 4-th graph now has 33557 vertices and 411978 edges (1.482 s). >>>>>>>> 4-th graph algorithm produced 33557 elements. *(66.209 s)*. >>>>>>>> >>>>>>>> Note that the number of elements in the output DataSet is equal to >>>>>>>> the number of vertices in the graph. >>>>>>>> On iteration i in my program, the executed graph algorithm >>>>>>>> incorporates the result DataSet of iteration i - 1 by means of the >>>>>>>> g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) >>>>>>>> function. >>>>>>>> >>>>>>>> The VertexJoinFunction is a simple forwarding mechanism to set the >>>>>>>> previous values: >>>>>>>> >>>>>>>> @FunctionAnnotation.ForwardedFieldsFirst("*->*") >>>>>>>> private static class RanksJoinFunction implements >>>>>>>> VertexJoinFunction<Double, Double> { >>>>>>>> @Override >>>>>>>> public Double vertexJoin(final Double vertexValue, final Double >>>>>>>> inputValue) throws Exception { >>>>>>>> return inputValue; >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> I have also used Flink's plan visualizer to check for >>>>>>>> discrepancies between the first iteration and the tenth (for example), >>>>>>>> but >>>>>>>> the layout of the plan remains exactly the same while the execution >>>>>>>> time >>>>>>>> continually increases for what should be the same amount of >>>>>>>> computations. >>>>>>>> >>>>>>>> *Bottom-line:* I was hoping someone could tell me how to overcome >>>>>>>> the performance bottleneck using the sequential job approach or >>>>>>>> enabling >>>>>>>> the output of intermediate results using Flink's Bulk Iterations. >>>>>>>> I believe others have stumbled upon this limitation before [2, >>>>>>>> 3]. >>>>>>>> I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit >>>>>>>> using a local environment: >>>>>>>> >>>>>>>> final Configuration conf = new Configuration(); >>>>>>>> final LocalEnvironment lenv = (LocalEnvironment) >>>>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >>>>>>>> final ExecutionEnvironment env = lenv; >>>>>>>> env.getConfig().disableSysoutLogging().setParallelism(1); >>>>>>>> >>>>>>>> I wish to execute in a cluster later on with a bigger dataset, so >>>>>>>> it would be essential that to maximize the ability to reuse the >>>>>>>> DataSets >>>>>>>> that are distributed by the Flink runtime. >>>>>>>> This would allow me to avoid the performance bottleneck that I >>>>>>>> described. >>>>>>>> Hopefully someone may shed light on this. >>>>>>>> >>>>>>>> Thanks for your attention. >>>>>>>> >>>>>>>> >>>>>>>> References: >>>>>>>> >>>>>>>> [1] https://stackoverflow.com/questions/37224140/possibility-of- >>>>>>>> saving-partial-outputs-from-bulk-iteration-in-flink-dataset/ >>>>>>>> 37352770#37352770 >>>>>>>> >>>>>>>> [2] http://mail-archives.apache.org/mod_mbox/flink-dev/201605.mb >>>>>>>> ox/%3CCY1PR0601MB1228D348B41B274B52BCD3AB94450@CY1PR0601MB12 >>>>>>>> 28.namprd06.prod.outlook.com%3E >>>>>>>> >>>>>>>> [3] http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>>>> ble.com/Intermediate-output-during-delta-iterations-td436.html >>>>>>>> >>>>>>>> Miguel E. Coimbra >>>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>>>>>> Skype: miguel.e.coimbra >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >