[ 
https://issues.apache.org/jira/browse/FLINK-35766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35766:
-----------------------------------
    Labels: pull-request-available  (was: )

> When the job contains many YieldingOperatorFactory instances, compiling the 
> JobGraph hangs
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35766
>                 URL: https://issues.apache.org/jira/browse/FLINK-35766
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>            Reporter: Junrui Li
>            Assignee: Junrui Li
>            Priority: Major
>              Labels: pull-request-available
>
> When a job contains YieldingOperatorFactory instances, the time complexity of 
> compiling the JobGraph is very high (with a complexity of O(N!)). This leads 
> to the job compilation hanging on creating chains when there are many 
> YieldingOperatorFactory instances (e.g., more than 30).
> This is a very rare bug, but we have users who use SQL that contains many 
> LookupJoins that use YieldingOperatorFactory in the production environment. A 
> simple reproducible case is as follows:
> {code:java}
> @Test
> void test() {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1);
>         env.fromSource(
>                         new NumberSequenceSource(0, 10), 
> WatermarkStrategy.noWatermarks(), "input")
>                 .map((x) -> x)
>                 // add 32 YieldingOperatorFactory
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .transform(
>                         "test", BasicTypeInfo.LONG_TYPE_INFO, new 
> YieldingTestOperatorFactory<>())
>                 .addSink(new DiscardingSink<>());
> env.getStreamGraph().getJobGraph();
> } {code}
> The reason is that there is no caching when determining edge chainable, 
> leading to repeated backward traversal each time a YiedlingOperatorFactor is 
> encountered onwards (see code: 
> [https://github.com/apache/flink/blob/90fc679df073754b93eb5c220373daad7dca0a32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L1602]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to