This is an automated email from the ASF dual-hosted git repository.

yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a7d9de2b60 [flink] Prohibit custom shuffle lookup join in adaptive 
parallelism mode (#7504)
a7d9de2b60 is described below

commit a7d9de2b608d0c9ba0dd0c01ddce18d0441efa11
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Mar 23 17:35:41 2026 +0800

    [flink] Prohibit custom shuffle lookup join in adaptive parallelism mode 
(#7504)
---
 .../paimon/flink/sink/AdaptiveParallelism.java     |  6 +++-
 .../paimon/flink/source/BaseDataTableSource.java   | 16 ++++++++++
 .../lookup/LookupJoinBucketShuffleITCase.java      | 34 ++++++++++++++++++++++
 .../apache/paimon/flink/util/AbstractTestBase.java |  6 +++-
 4 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
index 0fd7cf9f35..ffc01a1b0f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
@@ -27,7 +27,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 public class AdaptiveParallelism {
 
     public static boolean isEnabled(StreamExecutionEnvironment env) {
-        return 
env.getConfiguration().get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED);
+        return isEnabled(env.getConfiguration());
+    }
+
+    public static boolean isEnabled(ReadableConfig config) {
+        return 
config.get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED);
     }
 
     /**
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 5227fa0d27..943c4b582c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -27,6 +27,7 @@ import 
org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
 import org.apache.paimon.flink.lookup.partitioner.BucketShufflePartitioner;
 import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
 import org.apache.paimon.flink.lookup.partitioner.ShuffleStrategy;
+import org.apache.paimon.flink.sink.AdaptiveParallelism;
 import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
@@ -40,6 +41,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Projection;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -69,6 +71,8 @@ import java.util.OptionalLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
@@ -279,6 +283,18 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
         int numBuckets;
         ShuffleStrategy strategy = null;
         if (useCustomShuffle) {
+            try {
+                checkArgument(
+                        this.context
+                                        .getConfiguration()
+                                        .get(RUNTIME_MODE)
+                                        .equals(RuntimeExecutionMode.STREAMING)
+                                || 
!AdaptiveParallelism.isEnabled(this.context.getConfiguration()),
+                        "Custom shuffle lookup join is not supported in 
adaptive parallelism mode.");
+            } catch (NoClassDefFoundError ignored) {
+                // before 1.17, there is no adaptive parallelism
+            }
+
             numBuckets = table.store().options().bucket();
             BucketIdExtractor extractor =
                     new BucketIdExtractor(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
index a4e485d829..4ba3f7d6bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
@@ -19,7 +19,10 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.utils.BlockingIterator;
 
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
@@ -28,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for custom lookup shuffle. */
 public class LookupJoinBucketShuffleITCase extends CatalogITCaseBase {
@@ -189,6 +193,36 @@ public class LookupJoinBucketShuffleITCase extends 
CatalogITCaseBase {
         testBucketNumberCases(query);
     }
 
+    @Test
+    public void testBucketShuffleNotAllowedWithAQE() {
+        String nonPrimaryKeyDimTable = createNonPrimaryKeyDimTable("col1");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1")
+                        .replace("DIM", nonPrimaryKeyDimTable);
+
+        
tEnv.getConfig().set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED, 
true);
+
+        if (isFlinkVersionGreaterThanOrEqualTo("2.0")) {
+            assertThatThrownBy(
+                            () ->
+                                    
BlockingIterator.of(tEnv.executeSql(query).collect())
+                                            .collect(ROW_NUMBER))
+                    .rootCause()
+                    .isInstanceOf(IllegalArgumentException.class)
+                    .hasMessage(
+                            "Custom shuffle lookup join is not supported in 
adaptive parallelism mode.");
+        } else {
+            // Custom shuffle lookup join is not supported in Flink 1.x.
+            assertThatThrownBy(
+                            () ->
+                                    
BlockingIterator.of(tEnv.executeSql(query).collect())
+                                            .collect(ROW_NUMBER))
+                    .isInstanceOf(ValidationException.class)
+                    .hasMessage("Unknown join strategy : LOOKUP");
+        }
+    }
+
     private void testBucketNumberCases(String query) throws Exception {
         List<Row> groundTruthRows = getGroundTruthRows();
         // Test the case that bucket number = lookup parallelism.
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index e9fb0a55c6..ba0ca781e6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -56,8 +56,12 @@ import java.util.UUID;
 public class AbstractTestBase {
 
     protected static boolean isFlink2_1OrAbove() {
+        return isFlinkVersionGreaterThanOrEqualTo("2.1");
+    }
+
+    protected static boolean isFlinkVersionGreaterThanOrEqualTo(String 
baseFlinkVersion) {
         String flinkVersion = System.getProperty("test.flink.main.version");
-        return flinkVersion.compareTo("2.1") >= 0;
+        return flinkVersion.compareTo(baseFlinkVersion) >= 0;
     }
 
     private static final int DEFAULT_PARALLELISM = 16;

Reply via email to