[ 
https://issues.apache.org/jira/browse/FLINK-30531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-30531:
-----------------------------
    Description: 
Benchmark results show that Flink time to execute simple programs is more than 
3X slower than Spark. For example, if we run the following program with object 
re-use enabled and with parallelism=1, it takes roughtly 120 sec on a macbook, 
whereas it takes Spark less than 40 sec to run the same logic on the same 
machine.
{code:java}
DataStream<Long> stream = env.fromSequence(1, 1000000000L)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x).addSink(new DiscardingSink<>());
{code}
 

It turns out that the operator chain overhead introduced by Flink is 
surprisingly high. For the above example program, Flink runtime goes through a 
call stack of 24 functions to produce 1 element. And each extra map(...) 
operation introduces 3 extra functions in the call stack.

Here are the 24 functions in the call stack:
{code:bash}
StreamTask#processInput
StreamOneInputProcessor#processInput
StreamTaskSourceInput#emitNext
SourceOperator#emitNext
IteratorSourceReaderBase#pollNext
SourceOutputWithWatermarks#collect
AsyncDataOutputToOutput#emitRecord
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamSink#processElement
{code}
 

Given the observation described above, here are the explanations for why Flink 
is slow for programs with low computation overhead:
 * For each record produced, Flink runtime currently incurs an unnecessarily 
deep function call stack. It can be more than 24 for a simple program 
consisting of 5 map() operations.
 * Java's maximum inline level is less than 18 [2]. It is easy for operator 
chain call stack to exceed this limit and prevent Java from inlining function 
calls, which further increases the function call overhead.
 * For function calls that are not inlined, it requires looking up a virtual 
table since most functions are virtual functions.

Given the above explanations of the performance issue, here are the ideas to 
reduce Flink's runtime overhead:
 * Update SourceOperator#emitNext() to push records to DataOutput in a while 
loop. This can reduce the depth of the call stack needed to produce a record by 
3 functions. See FLINK-30533 for more information.
 * Fuse some functions (e.g. ChainingOutput, StreamMap, CountingOutput) to 
reduce the call stack depth required for each extra operation (e.g. map(...)).

[1] [https://arxiv.org/pdf/1610.09166.pdf]

[2] [https://bugs.openjdk.org/browse/JDK-8234863]

 

 

 

  was:
Benchmark results show that Flink time to execute simple programs is more than 
3X slower than Spark. For example, if we run the following program with object 
re-use enabled and with parallelism=1, it takes roughtly 120 sec on a macbook, 
whereas it takes Spark less than 40 sec to run the same logic on the same 
machine.
{code:java}
DataStream<Long> stream = env.fromSequence(1, 1000000000L)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x).addSink(new DiscardingSink<>());
{code}
 

It turns out that the operator chain overhead introduced by Flink is 
surprisingly high. For the above example program, Flink runtime goes through a 
call stack of 24 functions to produce 1 element. And each extra map(...) 
operation introduces 3 extra functions in the call stack.

Here are the 24 functions in the call stack:
{code:bash}
StreamTask#processInput
StreamOneInputProcessor#processInput
StreamTaskSourceInput#emitNext
SourceOperator#emitNext
IteratorSourceReaderBase#pollNext
SourceOutputWithWatermarks#collect
AsyncDataOutputToOutput#emitRecord
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamSink#processElement
{code}
 

Given the observation described above, here are the explanations for why Flink 
is slow for programs with low computation overhead:
 * For each record produced, Flink runtime currently incurs an unnecessarily 
deep function call stack. It can be more than 24 for a simple program 
consisting of 5 map() operations.
 * Java's maximum inline level is less than 18 [2]. It is easy for operator 
chain call stack to exceed this limit and prevent Java from inlining function 
calls, which further increases the function call overhead.
 * For function calls that are not inlined, it requires looking up a virtual 
table since most functions are virtual functions.

Given the above explanations of the performance issue, here are the ideas to 
reduce Flink's runtime overhead:
 * Update IteratorSourceReaderBase#pollNext() to push records to ReaderOutput 
in a while loop. This can reduce 4 function calls from the call stack needed to 
produce a record. See FLINK-30533 for more information.
 * Fuse some functions (e.g. ChainingOutput, StreamMap, CountingOutput) to 
reduce the call stack depth required for each extra operation (e.g. map(...)).

[1] [https://arxiv.org/pdf/1610.09166.pdf]

[2] [https://bugs.openjdk.org/browse/JDK-8234863]

 

 

 


> Reduce operator chain call stack depth
> --------------------------------------
>
>                 Key: FLINK-30531
>                 URL: https://issues.apache.org/jira/browse/FLINK-30531
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Major
>
> Benchmark results show that Flink time to execute simple programs is more 
> than 3X slower than Spark. For example, if we run the following program with 
> object re-use enabled and with parallelism=1, it takes roughtly 120 sec on a 
> macbook, whereas it takes Spark less than 40 sec to run the same logic on the 
> same machine.
> {code:java}
> DataStream<Long> stream = env.fromSequence(1, 1000000000L)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x).addSink(new DiscardingSink<>());
> {code}
>  
> It turns out that the operator chain overhead introduced by Flink is 
> surprisingly high. For the above example program, Flink runtime goes through 
> a call stack of 24 functions to produce 1 element. And each extra map(...) 
> operation introduces 3 extra functions in the call stack.
> Here are the 24 functions in the call stack:
> {code:bash}
> StreamTask#processInput
> StreamOneInputProcessor#processInput
> StreamTaskSourceInput#emitNext
> SourceOperator#emitNext
> IteratorSourceReaderBase#pollNext
> SourceOutputWithWatermarks#collect
> AsyncDataOutputToOutput#emitRecord
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamSink#processElement
> {code}
>  
> Given the observation described above, here are the explanations for why 
> Flink is slow for programs with low computation overhead:
>  * For each record produced, Flink runtime currently incurs an unnecessarily 
> deep function call stack. It can be more than 24 for a simple program 
> consisting of 5 map() operations.
>  * Java's maximum inline level is less than 18 [2]. It is easy for operator 
> chain call stack to exceed this limit and prevent Java from inlining function 
> calls, which further increases the function call overhead.
>  * For function calls that are not inlined, it requires looking up a virtual 
> table since most functions are virtual functions.
> Given the above explanations of the performance issue, here are the ideas to 
> reduce Flink's runtime overhead:
>  * Update SourceOperator#emitNext() to push records to DataOutput in a while 
> loop. This can reduce the depth of the call stack needed to produce a record 
> by 3 functions. See FLINK-30533 for more information.
>  * Fuse some functions (e.g. ChainingOutput, StreamMap, CountingOutput) to 
> reduce the call stack depth required for each extra operation (e.g. map(...)).
> [1] [https://arxiv.org/pdf/1610.09166.pdf]
> [2] [https://bugs.openjdk.org/browse/JDK-8234863]
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to