[ https://issues.apache.org/jira/browse/FLINK-30531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dong Lin updated FLINK-30531: ----------------------------- Description: Benchmark results show that Flink time to execute simple programs is more than 3X slower than Spark. For example, if we run the following program with object re-use enabled and with parallelism=1, it takes roughtly 120 sec on a macbook, whereas it takes Spark less than 40 sec to run the same logic on the same machine. {code:java} DataStream<Long> stream = env.fromSequence(1, 1000000000L) .map(x -> x) .map(x -> x) .map(x -> x) .map(x -> x) .map(x -> x).addSink(new DiscardingSink<>()); {code} It turns out that the operator chain overhead introduced by Flink is surprisingly high. For the above example program, Flink runtime goes through a call stack of 24 functions to produce 1 element. And each extra map(...) operation introduces 3 extra functions in the call stack. Here are the 24 functions in the call stack: {code:bash} StreamTask#processInput StreamOneInputProcessor#processInput StreamTaskSourceInput#emitNext SourceOperator#emitNext IteratorSourceReaderBase#pollNext SourceOutputWithWatermarks#collect AsyncDataOutputToOutput#emitRecord ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamSink#processElement {code} Given the observation described above, here are the explanations for why Flink is slow for programs with low computation overhead: * For each record produced, Flink runtime currently incurs an unnecessarily deep function call stack. It can be more than 24 for a simple program consisting of 5 map() operations. * Java's maximum inline level is less than 18 [2]. It is easy for operator chain call stack to exceed this limit and prevent Java from inlining function calls, which further increases the function call overhead. * For function calls that are not inlined, it requires looking up a virtual table since most functions are virtual functions. Given the above explanations of the performance issue, here are the ideas to reduce Flink's runtime overhead: * Update SourceOperator#emitNext() to push records to DataOutput in a while loop. This can reduce the depth of the call stack needed to produce a record by 3 functions. See FLINK-30533 for more information. * Fuse some functions (e.g. ChainingOutput, StreamMap, CountingOutput) to reduce the call stack depth required for each extra operation (e.g. map(...)). [1] [https://arxiv.org/pdf/1610.09166.pdf] [2] [https://bugs.openjdk.org/browse/JDK-8234863] was: Benchmark results show that Flink time to execute simple programs is more than 3X slower than Spark. For example, if we run the following program with object re-use enabled and with parallelism=1, it takes roughtly 120 sec on a macbook, whereas it takes Spark less than 40 sec to run the same logic on the same machine. {code:java} DataStream<Long> stream = env.fromSequence(1, 1000000000L) .map(x -> x) .map(x -> x) .map(x -> x) .map(x -> x) .map(x -> x).addSink(new DiscardingSink<>()); {code} It turns out that the operator chain overhead introduced by Flink is surprisingly high. For the above example program, Flink runtime goes through a call stack of 24 functions to produce 1 element. And each extra map(...) operation introduces 3 extra functions in the call stack. Here are the 24 functions in the call stack: {code:bash} StreamTask#processInput StreamOneInputProcessor#processInput StreamTaskSourceInput#emitNext SourceOperator#emitNext IteratorSourceReaderBase#pollNext SourceOutputWithWatermarks#collect AsyncDataOutputToOutput#emitRecord ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamSink#processElement {code} Given the observation described above, here are the explanations for why Flink is slow for programs with low computation overhead: * For each record produced, Flink runtime currently incurs an unnecessarily deep function call stack. It can be more than 24 for a simple program consisting of 5 map() operations. * Java's maximum inline level is less than 18 [2]. It is easy for operator chain call stack to exceed this limit and prevent Java from inlining function calls, which further increases the function call overhead. * For function calls that are not inlined, it requires looking up a virtual table since most functions are virtual functions. Given the above explanations of the performance issue, here are the ideas to reduce Flink's runtime overhead: * Update IteratorSourceReaderBase#pollNext() to push records to ReaderOutput in a while loop. This can reduce 4 function calls from the call stack needed to produce a record. See FLINK-30533 for more information. * Fuse some functions (e.g. ChainingOutput, StreamMap, CountingOutput) to reduce the call stack depth required for each extra operation (e.g. map(...)). [1] [https://arxiv.org/pdf/1610.09166.pdf] [2] [https://bugs.openjdk.org/browse/JDK-8234863] > Reduce operator chain call stack depth > -------------------------------------- > > Key: FLINK-30531 > URL: https://issues.apache.org/jira/browse/FLINK-30531 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task > Reporter: Dong Lin > Assignee: Dong Lin > Priority: Major > > Benchmark results show that Flink time to execute simple programs is more > than 3X slower than Spark. For example, if we run the following program with > object re-use enabled and with parallelism=1, it takes roughtly 120 sec on a > macbook, whereas it takes Spark less than 40 sec to run the same logic on the > same machine. > {code:java} > DataStream<Long> stream = env.fromSequence(1, 1000000000L) > .map(x -> x) > .map(x -> x) > .map(x -> x) > .map(x -> x) > .map(x -> x).addSink(new DiscardingSink<>()); > {code} > > It turns out that the operator chain overhead introduced by Flink is > surprisingly high. For the above example program, Flink runtime goes through > a call stack of 24 functions to produce 1 element. And each extra map(...) > operation introduces 3 extra functions in the call stack. > Here are the 24 functions in the call stack: > {code:bash} > StreamTask#processInput > StreamOneInputProcessor#processInput > StreamTaskSourceInput#emitNext > SourceOperator#emitNext > IteratorSourceReaderBase#pollNext > SourceOutputWithWatermarks#collect > AsyncDataOutputToOutput#emitRecord > ChainingOutput#collect > StreamMap#processElement > CountingOutput#collect > ChainingOutput#collect > StreamMap#processElement > CountingOutput#collect > ChainingOutput#collect > StreamMap#processElement > CountingOutput#collect > ChainingOutput#collect > StreamMap#processElement > CountingOutput#collect > ChainingOutput#collect > StreamMap#processElement > CountingOutput#collect > ChainingOutput#collect > StreamSink#processElement > {code} > > Given the observation described above, here are the explanations for why > Flink is slow for programs with low computation overhead: > * For each record produced, Flink runtime currently incurs an unnecessarily > deep function call stack. It can be more than 24 for a simple program > consisting of 5 map() operations. > * Java's maximum inline level is less than 18 [2]. It is easy for operator > chain call stack to exceed this limit and prevent Java from inlining function > calls, which further increases the function call overhead. > * For function calls that are not inlined, it requires looking up a virtual > table since most functions are virtual functions. > Given the above explanations of the performance issue, here are the ideas to > reduce Flink's runtime overhead: > * Update SourceOperator#emitNext() to push records to DataOutput in a while > loop. This can reduce the depth of the call stack needed to produce a record > by 3 functions. See FLINK-30533 for more information. > * Fuse some functions (e.g. ChainingOutput, StreamMap, CountingOutput) to > reduce the call stack depth required for each extra operation (e.g. map(...)). > [1] [https://arxiv.org/pdf/1610.09166.pdf] > [2] [https://bugs.openjdk.org/browse/JDK-8234863] > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)