Hi Miguel,

if the message size would be the problem, the client should fail with an
exception.
What might happen, is that the client gets stuck while optimizing the
program.

You could take a stacktrace of the client process to identify at which part
the client gets stuck.

Best, Fabian

2017-12-06 3:01 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:

> Hello Fabian,
>
> Thanks for the help.
> I am interested in the duration of specific operators, so the fact that
> parts of the execution are in pipeline is not a problem for me.
> From my understanding, the automated way to approach this is to run the
> Flink job with the web interface active and then make a REST call on the
> appropriate job and parse the JSON.
> I have implemented that and my program is able to read the duration of
> specific operators.
>
> However, I'm facing another problem whose cause I haven't been able to
> pinpoint.
>
> Testing on 1.4.0-SNAPSHOT.
>
> When launching a cluster (start-local.bat on Windows or start-cluster.sh
> on Linux), the JobManager, the TaskManager(s) are launched and the web
> front end becomes active (I can access it via browser) - everything is ok
> so far.
> The problem occurs when the number of operators in the plan increases.
>
> Consider that I execute my algorithm three (3) times through a single
> Flink plan.
> For three times, vertices and edges will be added to the graph (via Gelly
> methods).
> This is defined in a for loop in my program. For each iteration:
>
> // I add 100 edges to the graph, decomposed in a list of vertices and edges
> final Graph<Long, NullValue, NullValue> newGraph = graph
>     .addVertices(verticesToAdd)
>     .addEdges(edgesToAdd);
>
> // Generate identifications for the vertex counter.
> final String vid = new AbstractID().toString();
> newGraph.getVertices().output(new Utils.CountHelper<Vertex<Long,
> NullValue>>(vid)).name("count()");
> vids.put(executionCounter, vid);
>
> // Generate identifications for the edge counter.
> final String eid = new AbstractID().toString();
> newGraph.getEdges().output(new Utils.CountHelper<Edge<Long,
> NullValue>>(eid)).name("count()");
> eids.put(executionCounter, eid);
>
> So far I have created 2 sinks in the current iteration in my regular Java
> program.
> Then I execute my graph algorithm with the previous results:
>
> // Execute the graph algorithm and output some of its results
> result = newGraph.run(new MyGraphAlgorithm<>(previousResults));
> result.sortPartition(1, Order.DESCENDING).setParallelism(1).first(1000).
> output(outputFormat).name("Store results");
> previousResults = result;
>
> This creates one additional sink via the output function.
> So for three (3) iterations, I define nine (9) sinks in the plan, call
> execute() and afterward retrieve the contents of the sinks.
> This works fine so far.
>
> If I run with 10 iterations, I will be creating 30 sinks.
> The problem is that for 10 iterations, the Flink client program just hangs
> on the execute() call forever (execution time should increase linearly
> with the number of iterations).
> For 3 iterations, execute() proceeds normally and it takes around 20
> seconds per iteration, so 3 iterations is 60 seconds and 10 should be
> around 3 minutes.
> After five hours, there was no progress.
>
> Furthermore, I checked the web monitor periodically and there was not a
> single job.
> It seems that the client program is simply not sending the job to the
> cluster if the job plan becomes too big.
> The exact same compiled program, with 3 iterations (via argument) works,
> but with 10 (via argument) it simply falls silent.
>
> I am trying to understand what may be the problem:
>
> - An internal limit in the number of datasinks or operators in the plan?
>
> - A limit in message size preventing the client from sending the job?
> (see: https://issues.apache.org/jira/browse/FLINK-2603 )
> I have tried increasing the akka.framesize to 256000kB in the Flink
> server flink-conf.yaml config and in the client program when creating the
> remote environment with:
>
> Configuration clientConfig = new Configuration();
> final String akkaFrameSize = "256000kB";
> ConfigOption<String> akkaConfig = ConfigOptions.key(AkkaOptions.
> FRAMESIZE.key()).defaultValue(akkaFrameSize);
> clientConfig.setString(akkaConfig, akkaFrameSize);
> env = new RemoteEnvironment(remoteAddress, remotePort, clientConfig,
> jarFiles, null);
>
> I have run out of ideas with respect to the causes.
> Hoping you may be able to shed some light.
>
> Thank you for your time,
>
> Best regards,
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
> Skype: miguel.e.coimbra
>
> On 29 November 2017 at 10:23, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> The monitoring REST interface provides detailed stats about a job, its
>> tasks, and processing verticies including their start and end time [1].
>>
>> However, it is not trivial to make sense of the execution times because
>> Flink uses pipelined shuffles by default.
>> That means that the execution of multiple operators can overlap. For
>> example the records that are produced by a GroupReduce can be processed by
>> a Map, shuffled, and sorted (for another GroupReduce) in a pipelined
>> fashion.
>> Hence, all these operations run at the same time. You can disable this
>> behavior to some extend by setting the execution mode to batched shuffles
>> [2].
>> However, this will likely have a negative impact on the overall execution
>> time.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> monitoring/rest_api.html#details-of-a-running-or-completed-job
>> [2] https://stackoverflow.com/questions/33691612/apache-flink-
>> stepwise-execution/33691957#33691957
>>
>>
>>
>> 2017-11-29 0:44 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>
>>> Hello,
>>>
>>> You're right, I was overlooking that.
>>> With your suggestion, I now just define a different sink in each
>>> iteration of the loop.
>>> Then they all output to disk when executing a single bigger plan.
>>>
>>> I have one more question: I know I can retrieve the total time this
>>> single job takes to execute, but what if I want to know the time taken for
>>> specific operators in the dag?
>>> Is there some functionality in the Flink Batch API akin to counting
>>> elements but for measuring time instead?
>>> For example, if I am not mistaken, an operator can be executed in
>>> parallel or serially (with a parallelism of one).
>>>
>>> Is there a straightforward way to get the time taken by the operator's
>>> tasks?
>>> In a way that I could:
>>>
>>> a) just get the time of a single task (if running serially) to get the
>>> total operator execution time;
>>> b) know the time taken by each parallel component of the operator's
>>> execution so I could know where and what was the "lagging element" in the
>>> operator's execution.
>>>
>>> Is this possible? I was hoping I could retrieve this information in the
>>> Java program itself and avoid processing logs.
>>>
>>> Thanks again.
>>>
>>> Best regards,
>>>
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>> Skype: miguel.e.coimbra
>>>
>>> On 28 November 2017 at 08:56, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> by calling result.count(), you compute the complete plan from the
>>>> beginning and not just the operations you added since the last execution.
>>>> Looking at the output you posted, each step takes about 15 seconds
>>>> (with about 5 secs of initialization).
>>>> So the 20 seconds of the first step include initialization + 1st step.
>>>> The 35 seconds on the second step include initialization, 1st step +
>>>> 2nd step.
>>>> If you don't call count on the intermediate steps, you can compute the
>>>> 4th step in 65 seconds.
>>>>
>>>> Implementing a caching operator would be a pretty huge effort because
>>>> you need to touch code at many places such as the API, optimizer, runtime,
>>>> scheduling, etc.
>>>> The documentation you found should still be applicable. There hasn't
>>>> been major additions to the DataSet API and runtime in the last releases.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> 2017-11-28 9:14 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>>>>
>>>>> Hello Fabian,
>>>>>
>>>>> Thank you for the reply.
>>>>> I was hoping the situation had in fact changed.
>>>>>
>>>>> As far as I know, I am not calling execute() directly even once - it
>>>>> is being called implicitly by simple DataSink elements added to the
>>>>> plan through count():
>>>>>
>>>>> System.out.println(String.format("%d-th graph algorithm produced %d
>>>>> elements. (%d.%d s).",
>>>>>                             executionCounter,
>>>>>                             *result.count()*, // this would trigger
>>>>> execution...
>>>>>                             env.getLastJobExecutionResult(
>>>>> ).getNetRuntime(TimeUnit.SECONDS),
>>>>>                             env.getLastJobExecutionResult(
>>>>> ).getNetRuntime(TimeUnit.MILLISECONDS) % 1000));
>>>>>
>>>>>
>>>>> I have taken a look at Flink's code base (e.g. how the dataflow dag is
>>>>> processed with classes such as  
>>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor,
>>>>> org.apache.flink.api.java.operators.OperatorTranslation) but I'm not
>>>>> sure on the most direct way to achieve this.
>>>>> Perhaps I missed some online documentation that would help to get a
>>>>> grip on how to contribute to the different parts of Flink?
>>>>>
>>>>> I did find some information which hints at implementing this sort of
>>>>> thing (such as adding custom operators) but it was associated to an old
>>>>> version of Flink:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>> internals/add_operator.html
>>>>> However, as far as I know there is no equivalent page in the current
>>>>> online stable or snapshot documentation.
>>>>>
>>>>> What would be the best way to go about this?
>>>>>
>>>>> It really seems that the DataSet stored in the result variable is
>>>>> always representing an increasing sequence of executions and not just the
>>>>> results of the last execution.
>>>>>
>>>>>
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Miguel E. Coimbra
>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>> Skype: miguel.e.coimbra
>>>>>
>>>>> On 27 November 2017 at 22:56, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>
>>>>>> Hi Miguel,
>>>>>>
>>>>>> I'm sorry but AFAIK, the situation has not changed.
>>>>>>
>>>>>> Is it possible that you are calling execute() multiple times?
>>>>>> In that case, the 1-st and 2-nd graph would be recomputed before the
>>>>>> 3-rd graph is computed.
>>>>>> That would explain the increasing execution time of 15 seconds.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2017-11-26 17:45 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm facing a problem in an algorithm where I would like to
>>>>>>> constantly update a DataSet representing a graph, perform some
>>>>>>> computation, output one or more DataSink (such as a file on the
>>>>>>> local system) and then reuse the DataSet for a next iteration.
>>>>>>> ​I want to avoid spilling the results to disk at the end of an
>>>>>>> iteration and to read it back in the next iterations - the graph is very
>>>>>>> big and I do not wish to incur that time overhead.
>>>>>>> I want to reuse the full result DataSet of each iteration in the
>>>>>>> next one and I want to save to disk a small percentage of the produced
>>>>>>> DataSet for each iteration.
>>>>>>> The space complexity is rather constant - the number of edges in the
>>>>>>> graph increases by only 100 between iterations (which is an extremely 
>>>>>>> low
>>>>>>> percentage of the original graph's edges) and is obtained using
>>>>>>> env.fromCollection(edgesToAdd).
>>>>>>>
>>>>>>> Although I am using Flink's Gelly API for graphs, I have no problem
>>>>>>> working directly with the underlying vertex and edge DataSet
>>>>>>> elements.​
>>>>>>>
>>>>>>> Two ways to do this occur to me, but it seems both are currently not
>>>>>>> supported in Flink, as per Vasia's answer to this Stack Overflow 
>>>>>>> quest​ion
>>>>>>> [1]:
>>>>>>>
>>>>>>> «​*​*
>>>>>>>
>>>>>>>
>>>>>>> *Unfortunately, it is not currently possible to output intermediate
>>>>>>> results from a bulk iteration.You can only output the final result at 
>>>>>>> the
>>>>>>> end of the iteration.Also, as you correctly noticed, Flink cannot
>>>>>>> efficiently unroll a while-loop or for-loop, so that won't work 
>>>>>>> either.»*
>>>>>>>
>>>>>>> *1.* I thought I could create a bulk iteration, perform the
>>>>>>> computation and between iterations, output the result to the file 
>>>>>>> system.
>>>>>>> However, this is not possible, as per Vasia's answer, and produces
>>>>>>> the following exception on execution when I try (for example, to 
>>>>>>> calculate
>>>>>>> a centrality metric for every vertex and dump the results to disk), as
>>>>>>> expected based on that information:
>>>>>>>
>>>>>>> org.apache.flink.api.common.InvalidProgramException: A data set
>>>>>>> that is part of an iteration was used as a sink or action. Did you 
>>>>>>> forget
>>>>>>> to close the iteration?
>>>>>>>
>>>>>>> *2.* Using a for loop in my own program and triggering sequential
>>>>>>> Flink job executions.
>>>>>>> Problem: in this scenario, while I am able to use a DataSet produced
>>>>>>> in an iteration's Flink job (and dump the desired output information to
>>>>>>> disk) and pass it to the next Flink job, the computation time increases
>>>>>>> constantly:
>>>>>>> (I also tried manually starting a session which is kept open with
>>>>>>> env.startNewSession() before the loop - no impact)
>>>>>>>
>>>>>>> ​​
>>>>>>> Initial graph has 33511 vertices and 411578 edges.
>>>>>>> Added 113 vertices and 100 edges.
>>>>>>> 1-th graph now has 33524 vertices and 411678 edges (2.543 s).
>>>>>>> 1-th graph algorithm produced 33524 elements. *(20.96 s)*.
>>>>>>> Added 222 vertices and 200 edges.
>>>>>>> 2-th graph now has 33536 vertices and 411778 edges (1.919 s).
>>>>>>> 2-th graph algorithm produced 33536 elements. *(35.913 s)*.
>>>>>>> Added 326 vertices and 300 edges.
>>>>>>> 3-th graph now has 33543 vertices and 411878 edges (1.825 s).
>>>>>>> 3-th graph algorithm produced 33543 elements. *(49.624 s)*.
>>>>>>> Added 436 vertices and 400 edges.
>>>>>>> 4-th graph now has 33557 vertices and 411978 edges (1.482 s).
>>>>>>> 4-th graph algorithm produced 33557 elements. *(66.209 s)*.
>>>>>>>
>>>>>>> Note that the number of elements in the output DataSet is equal to
>>>>>>> the number of vertices in the graph.
>>>>>>> On iteration i in my program, the executed graph algorithm
>>>>>>> incorporates the result DataSet of iteration i - 1 by means of the
>>>>>>> g.joinWithVertices(previousResultDataSet, new RanksJoinFunction())
>>>>>>> function.
>>>>>>>
>>>>>>> The VertexJoinFunction is a simple forwarding mechanism to set the
>>>>>>> previous values:
>>>>>>>
>>>>>>> @FunctionAnnotation.ForwardedFieldsFirst("*->*")
>>>>>>> private static class RanksJoinFunction implements
>>>>>>> VertexJoinFunction<Double, Double> {
>>>>>>>     @Override
>>>>>>>     public Double vertexJoin(final Double vertexValue, final Double
>>>>>>> inputValue) throws Exception {
>>>>>>>         return inputValue;
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> ​I have also used Flink's plan visualizer to check for discrepancies
>>>>>>> between the first iteration and the tenth (for example), but the layout 
>>>>>>> of
>>>>>>> the plan remains exactly the same while the execution time continually
>>>>>>> increases for what should be the same amount of computations.
>>>>>>>
>>>>>>> *Bottom-line:* ​I was hoping someone could tell me how to overcome
>>>>>>> the performance bottleneck using the sequential job approach or enabling
>>>>>>> the output of intermediate results using Flink's Bulk Iterations.
>>>>>>> ​​​I believe others have stumbled upon this limitation before [2,
>>>>>>> 3].​​
>>>>>>> I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit
>>>>>>> using a local environment:
>>>>>>>
>>>>>>> final Configuration conf = new Configuration();
>>>>>>> final LocalEnvironment lenv = (LocalEnvironment)
>>>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>>>> final ExecutionEnvironment env = lenv;
>>>>>>> env.getConfig().disableSysoutLogging().setParallelism(1);
>>>>>>>
>>>>>>> ​I wish to ​execute in a cluster later on with a bigger dataset, so
>>>>>>> it would be essential that to maximize the ability to reuse the DataSets
>>>>>>> that are distributed by the Flink runtime.
>>>>>>> This would allow me to avoid the performance bottleneck that I
>>>>>>> described.
>>>>>>> ​Hopefully someone may shed light on this.​
>>>>>>>
>>>>>>> ​Thanks for your attention.​
>>>>>>>
>>>>>>>
>>>>>>> ​References:​
>>>>>>>
>>>>>>> ​[1] https://stackoverflow.com/questions/37224140/possibility-of-
>>>>>>> saving-partial-outputs-from-bulk-iteration-in-flink-dataset/
>>>>>>> 37352770#37352770​
>>>>>>>
>>>>>>> ​[2] http://mail-archives.apache.org/mod_mbox/flink-dev/201605.mb
>>>>>>> ox/%3CCY1PR0601MB1228D348B41B274B52BCD3AB94450@CY1PR0601MB12
>>>>>>> 28.namprd06.prod.outlook.com%3E​
>>>>>>>
>>>>>>> ​[3] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>>> ble.com/Intermediate-output-during-delta-iterations-td436.html​
>>>>>>>
>>>>>>> Miguel E. Coimbra
>>>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>>>> Skype: miguel.e.coimbra
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to