Hi Miguel, if the message size would be the problem, the client should fail with an exception. What might happen, is that the client gets stuck while optimizing the program.
You could take a stacktrace of the client process to identify at which part the client gets stuck. Best, Fabian 2017-12-06 3:01 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: > Hello Fabian, > > Thanks for the help. > I am interested in the duration of specific operators, so the fact that > parts of the execution are in pipeline is not a problem for me. > From my understanding, the automated way to approach this is to run the > Flink job with the web interface active and then make a REST call on the > appropriate job and parse the JSON. > I have implemented that and my program is able to read the duration of > specific operators. > > However, I'm facing another problem whose cause I haven't been able to > pinpoint. > > Testing on 1.4.0-SNAPSHOT. > > When launching a cluster (start-local.bat on Windows or start-cluster.sh > on Linux), the JobManager, the TaskManager(s) are launched and the web > front end becomes active (I can access it via browser) - everything is ok > so far. > The problem occurs when the number of operators in the plan increases. > > Consider that I execute my algorithm three (3) times through a single > Flink plan. > For three times, vertices and edges will be added to the graph (via Gelly > methods). > This is defined in a for loop in my program. For each iteration: > > // I add 100 edges to the graph, decomposed in a list of vertices and edges > final Graph<Long, NullValue, NullValue> newGraph = graph > .addVertices(verticesToAdd) > .addEdges(edgesToAdd); > > // Generate identifications for the vertex counter. > final String vid = new AbstractID().toString(); > newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long, > NullValue>>(vid)).name("count()"); > vids.put(executionCounter, vid); > > // Generate identifications for the edge counter. > final String eid = new AbstractID().toString(); > newGraph.getEdges().output(new Utils.CountHelper<Edge<Long, > NullValue>>(eid)).name("count()"); > eids.put(executionCounter, eid); > > So far I have created 2 sinks in the current iteration in my regular Java > program. > Then I execute my graph algorithm with the previous results: > > // Execute the graph algorithm and output some of its results > result = newGraph.run(new MyGraphAlgorithm<>(previousResults)); > result.sortPartition(1, Order.DESCENDING).setParallelism(1).first(1000). > output(outputFormat).name("Store results"); > previousResults = result; > > This creates one additional sink via the output function. > So for three (3) iterations, I define nine (9) sinks in the plan, call > execute() and afterward retrieve the contents of the sinks. > This works fine so far. > > If I run with 10 iterations, I will be creating 30 sinks. > The problem is that for 10 iterations, the Flink client program just hangs > on the execute() call forever (execution time should increase linearly > with the number of iterations). > For 3 iterations, execute() proceeds normally and it takes around 20 > seconds per iteration, so 3 iterations is 60 seconds and 10 should be > around 3 minutes. > After five hours, there was no progress. > > Furthermore, I checked the web monitor periodically and there was not a > single job. > It seems that the client program is simply not sending the job to the > cluster if the job plan becomes too big. > The exact same compiled program, with 3 iterations (via argument) works, > but with 10 (via argument) it simply falls silent. > > I am trying to understand what may be the problem: > > - An internal limit in the number of datasinks or operators in the plan? > > - A limit in message size preventing the client from sending the job? > (see: https://issues.apache.org/jira/browse/FLINK-2603 ) > I have tried increasing the akka.framesize to 256000kB in the Flink > server flink-conf.yaml config and in the client program when creating the > remote environment with: > > Configuration clientConfig = new Configuration(); > final String akkaFrameSize = "256000kB"; > ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions. > FRAMESIZE.key()).defaultValue(akkaFrameSize); > clientConfig.setString(akkaConfig, akkaFrameSize); > env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig, > jarFiles, null); > > I have run out of ideas with respect to the causes. > Hoping you may be able to shed some light. > > Thank you for your time, > > Best regards, > > Miguel E. Coimbra > Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> > Skype: miguel.e.coimbra > > On 29 November 2017 at 10:23, Fabian Hueske <fhue...@gmail.com> wrote: > >> The monitoring REST interface provides detailed stats about a job, its >> tasks, and processing verticies including their start and end time [1]. >> >> However, it is not trivial to make sense of the execution times because >> Flink uses pipelined shuffles by default. >> That means that the execution of multiple operators can overlap. For >> example the records that are produced by a GroupReduce can be processed by >> a Map, shuffled, and sorted (for another GroupReduce) in a pipelined >> fashion. >> Hence, all these operations run at the same time. You can disable this >> behavior to some extend by setting the execution mode to batched shuffles >> [2]. >> However, this will likely have a negative impact on the overall execution >> time. >> >> Best, Fabian >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >> monitoring/rest_api.html#details-of-a-running-or-completed-job >> [2] https://stackoverflow.com/questions/33691612/apache-flink- >> stepwise-execution/33691957#33691957 >> >> >> >> 2017-11-29 0:44 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >> >>> Hello, >>> >>> You're right, I was overlooking that. >>> With your suggestion, I now just define a different sink in each >>> iteration of the loop. >>> Then they all output to disk when executing a single bigger plan. >>> >>> I have one more question: I know I can retrieve the total time this >>> single job takes to execute, but what if I want to know the time taken for >>> specific operators in the dag? >>> Is there some functionality in the Flink Batch API akin to counting >>> elements but for measuring time instead? >>> For example, if I am not mistaken, an operator can be executed in >>> parallel or serially (with a parallelism of one). >>> >>> Is there a straightforward way to get the time taken by the operator's >>> tasks? >>> In a way that I could: >>> >>> a) just get the time of a single task (if running serially) to get the >>> total operator execution time; >>> b) know the time taken by each parallel component of the operator's >>> execution so I could know where and what was the "lagging element" in the >>> operator's execution. >>> >>> Is this possible? I was hoping I could retrieve this information in the >>> Java program itself and avoid processing logs. >>> >>> Thanks again. >>> >>> Best regards, >>> >>> >>> Miguel E. Coimbra >>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>> Skype: miguel.e.coimbra >>> >>> On 28 November 2017 at 08:56, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> by calling result.count(), you compute the complete plan from the >>>> beginning and not just the operations you added since the last execution. >>>> Looking at the output you posted, each step takes about 15 seconds >>>> (with about 5 secs of initialization). >>>> So the 20 seconds of the first step include initialization + 1st step. >>>> The 35 seconds on the second step include initialization, 1st step + >>>> 2nd step. >>>> If you don't call count on the intermediate steps, you can compute the >>>> 4th step in 65 seconds. >>>> >>>> Implementing a caching operator would be a pretty huge effort because >>>> you need to touch code at many places such as the API, optimizer, runtime, >>>> scheduling, etc. >>>> The documentation you found should still be applicable. There hasn't >>>> been major additions to the DataSet API and runtime in the last releases. >>>> >>>> Best, Fabian >>>> >>>> >>>> >>>> 2017-11-28 9:14 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >>>> >>>>> Hello Fabian, >>>>> >>>>> Thank you for the reply. >>>>> I was hoping the situation had in fact changed. >>>>> >>>>> As far as I know, I am not calling execute() directly even once - it >>>>> is being called implicitly by simple DataSink elements added to the >>>>> plan through count(): >>>>> >>>>> System.out.println(String.format("%d-th graph algorithm produced %d >>>>> elements. (%d.%d s).", >>>>> executionCounter, >>>>> *result.count()*, // this would trigger >>>>> execution... >>>>> env.getLastJobExecutionResult( >>>>> ).getNetRuntime(TimeUnit.SECONDS), >>>>> env.getLastJobExecutionResult( >>>>> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000)); >>>>> >>>>> >>>>> I have taken a look at Flink's code base (e.g. how the dataflow dag is >>>>> processed with classes such as >>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor, >>>>> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not >>>>> sure on the most direct way to achieve this. >>>>> Perhaps I missed some online documentation that would help to get a >>>>> grip on how to contribute to the different parts of Flink? >>>>> >>>>> I did find some information which hints at implementing this sort of >>>>> thing (such as adding custom operators) but it was associated to an old >>>>> version of Flink: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>> internals/add_operator.html >>>>> However, as far as I know there is no equivalent page in the current >>>>> online stable or snapshot documentation. >>>>> >>>>> What would be the best way to go about this? >>>>> >>>>> It really seems that the DataSet stored in the result variable is >>>>> always representing an increasing sequence of executions and not just the >>>>> results of the last execution. >>>>> >>>>> >>>>> >>>>> Best regards, >>>>> >>>>> Miguel E. Coimbra >>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>>> Skype: miguel.e.coimbra >>>>> >>>>> On 27 November 2017 at 22:56, Fabian Hueske <fhue...@gmail.com> wrote: >>>>> >>>>>> Hi Miguel, >>>>>> >>>>>> I'm sorry but AFAIK, the situation has not changed. >>>>>> >>>>>> Is it possible that you are calling execute() multiple times? >>>>>> In that case, the 1-st and 2-nd graph would be recomputed before the >>>>>> 3-rd graph is computed. >>>>>> That would explain the increasing execution time of 15 seconds. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> 2017-11-26 17:45 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com >>>>>> >: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I'm facing a problem in an algorithm where I would like to >>>>>>> constantly update a DataSet representing a graph, perform some >>>>>>> computation, output one or more DataSink (such as a file on the >>>>>>> local system) and then reuse the DataSet for a next iteration. >>>>>>> I want to avoid spilling the results to disk at the end of an >>>>>>> iteration and to read it back in the next iterations - the graph is very >>>>>>> big and I do not wish to incur that time overhead. >>>>>>> I want to reuse the full result DataSet of each iteration in the >>>>>>> next one and I want to save to disk a small percentage of the produced >>>>>>> DataSet for each iteration. >>>>>>> The space complexity is rather constant - the number of edges in the >>>>>>> graph increases by only 100 between iterations (which is an extremely >>>>>>> low >>>>>>> percentage of the original graph's edges) and is obtained using >>>>>>> env.fromCollection(edgesToAdd). >>>>>>> >>>>>>> Although I am using Flink's Gelly API for graphs, I have no problem >>>>>>> working directly with the underlying vertex and edge DataSet >>>>>>> elements. >>>>>>> >>>>>>> Two ways to do this occur to me, but it seems both are currently not >>>>>>> supported in Flink, as per Vasia's answer to this Stack Overflow >>>>>>> question >>>>>>> [1]: >>>>>>> >>>>>>> «** >>>>>>> >>>>>>> >>>>>>> *Unfortunately, it is not currently possible to output intermediate >>>>>>> results from a bulk iteration.You can only output the final result at >>>>>>> the >>>>>>> end of the iteration.Also, as you correctly noticed, Flink cannot >>>>>>> efficiently unroll a while-loop or for-loop, so that won't work >>>>>>> either.»* >>>>>>> >>>>>>> *1.* I thought I could create a bulk iteration, perform the >>>>>>> computation and between iterations, output the result to the file >>>>>>> system. >>>>>>> However, this is not possible, as per Vasia's answer, and produces >>>>>>> the following exception on execution when I try (for example, to >>>>>>> calculate >>>>>>> a centrality metric for every vertex and dump the results to disk), as >>>>>>> expected based on that information: >>>>>>> >>>>>>> org.apache.flink.api.common.InvalidProgramException: A data set >>>>>>> that is part of an iteration was used as a sink or action. Did you >>>>>>> forget >>>>>>> to close the iteration? >>>>>>> >>>>>>> *2.* Using a for loop in my own program and triggering sequential >>>>>>> Flink job executions. >>>>>>> Problem: in this scenario, while I am able to use a DataSet produced >>>>>>> in an iteration's Flink job (and dump the desired output information to >>>>>>> disk) and pass it to the next Flink job, the computation time increases >>>>>>> constantly: >>>>>>> (I also tried manually starting a session which is kept open with >>>>>>> env.startNewSession() before the loop - no impact) >>>>>>> >>>>>>> >>>>>>> Initial graph has 33511 vertices and 411578 edges. >>>>>>> Added 113 vertices and 100 edges. >>>>>>> 1-th graph now has 33524 vertices and 411678 edges (2.543 s). >>>>>>> 1-th graph algorithm produced 33524 elements. *(20.96 s)*. >>>>>>> Added 222 vertices and 200 edges. >>>>>>> 2-th graph now has 33536 vertices and 411778 edges (1.919 s). >>>>>>> 2-th graph algorithm produced 33536 elements. *(35.913 s)*. >>>>>>> Added 326 vertices and 300 edges. >>>>>>> 3-th graph now has 33543 vertices and 411878 edges (1.825 s). >>>>>>> 3-th graph algorithm produced 33543 elements. *(49.624 s)*. >>>>>>> Added 436 vertices and 400 edges. >>>>>>> 4-th graph now has 33557 vertices and 411978 edges (1.482 s). >>>>>>> 4-th graph algorithm produced 33557 elements. *(66.209 s)*. >>>>>>> >>>>>>> Note that the number of elements in the output DataSet is equal to >>>>>>> the number of vertices in the graph. >>>>>>> On iteration i in my program, the executed graph algorithm >>>>>>> incorporates the result DataSet of iteration i - 1 by means of the >>>>>>> g.joinWithVertices(previousResultDataSet, new RanksJoinFunction()) >>>>>>> function. >>>>>>> >>>>>>> The VertexJoinFunction is a simple forwarding mechanism to set the >>>>>>> previous values: >>>>>>> >>>>>>> @FunctionAnnotation.ForwardedFieldsFirst("*->*") >>>>>>> private static class RanksJoinFunction implements >>>>>>> VertexJoinFunction<Double, Double> { >>>>>>> @Override >>>>>>> public Double vertexJoin(final Double vertexValue, final Double >>>>>>> inputValue) throws Exception { >>>>>>> return inputValue; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> I have also used Flink's plan visualizer to check for discrepancies >>>>>>> between the first iteration and the tenth (for example), but the layout >>>>>>> of >>>>>>> the plan remains exactly the same while the execution time continually >>>>>>> increases for what should be the same amount of computations. >>>>>>> >>>>>>> *Bottom-line:* I was hoping someone could tell me how to overcome >>>>>>> the performance bottleneck using the sequential job approach or enabling >>>>>>> the output of intermediate results using Flink's Bulk Iterations. >>>>>>> I believe others have stumbled upon this limitation before [2, >>>>>>> 3]. >>>>>>> I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit >>>>>>> using a local environment: >>>>>>> >>>>>>> final Configuration conf = new Configuration(); >>>>>>> final LocalEnvironment lenv = (LocalEnvironment) >>>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >>>>>>> final ExecutionEnvironment env = lenv; >>>>>>> env.getConfig().disableSysoutLogging().setParallelism(1); >>>>>>> >>>>>>> I wish to execute in a cluster later on with a bigger dataset, so >>>>>>> it would be essential that to maximize the ability to reuse the DataSets >>>>>>> that are distributed by the Flink runtime. >>>>>>> This would allow me to avoid the performance bottleneck that I >>>>>>> described. >>>>>>> Hopefully someone may shed light on this. >>>>>>> >>>>>>> Thanks for your attention. >>>>>>> >>>>>>> >>>>>>> References: >>>>>>> >>>>>>> [1] https://stackoverflow.com/questions/37224140/possibility-of- >>>>>>> saving-partial-outputs-from-bulk-iteration-in-flink-dataset/ >>>>>>> 37352770#37352770 >>>>>>> >>>>>>> [2] http://mail-archives.apache.org/mod_mbox/flink-dev/201605.mb >>>>>>> ox/%3CCY1PR0601MB1228D348B41B274B52BCD3AB94450@CY1PR0601MB12 >>>>>>> 28.namprd06.prod.outlook.com%3E >>>>>>> >>>>>>> [3] http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>>> ble.com/Intermediate-output-during-delta-iterations-td436.html >>>>>>> >>>>>>> Miguel E. Coimbra >>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>>>>> Skype: miguel.e.coimbra >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >