[ https://issues.apache.org/jira/browse/FLINK-35766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-35766: ----------------------------------- Labels: pull-request-available (was: ) > When the job contains many YieldingOperatorFactory instances, compiling the > JobGraph hangs > ------------------------------------------------------------------------------------------ > > Key: FLINK-35766 > URL: https://issues.apache.org/jira/browse/FLINK-35766 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Reporter: Junrui Li > Assignee: Junrui Li > Priority: Major > Labels: pull-request-available > > When a job contains YieldingOperatorFactory instances, the time complexity of > compiling the JobGraph is very high (with a complexity of O(N!)). This leads > to the job compilation hanging on creating chains when there are many > YieldingOperatorFactory instances (e.g., more than 30). > This is a very rare bug, but we have users who use SQL that contains many > LookupJoins that use YieldingOperatorFactory in the production environment. A > simple reproducible case is as follows: > {code:java} > @Test > void test() { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1); > env.fromSource( > new NumberSequenceSource(0, 10), > WatermarkStrategy.noWatermarks(), "input") > .map((x) -> x) > // add 32 YieldingOperatorFactory > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .transform( > "test", BasicTypeInfo.LONG_TYPE_INFO, new > YieldingTestOperatorFactory<>()) > .addSink(new DiscardingSink<>()); > env.getStreamGraph().getJobGraph(); > } {code} > The reason is that there is no caching when determining edge chainable, > leading to repeated backward traversal each time a YiedlingOperatorFactor is > encountered onwards (see code: > [https://github.com/apache/flink/blob/90fc679df073754b93eb5c220373daad7dca0a32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L1602]). -- This message was sent by Atlassian Jira (v8.20.10#820010)