This is an automated email from the ASF dual-hosted git repository.
yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a7d9de2b60 [flink] Prohibit custom shuffle lookup join in adaptive
parallelism mode (#7504)
a7d9de2b60 is described below
commit a7d9de2b608d0c9ba0dd0c01ddce18d0441efa11
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Mar 23 17:35:41 2026 +0800
[flink] Prohibit custom shuffle lookup join in adaptive parallelism mode
(#7504)
---
.../paimon/flink/sink/AdaptiveParallelism.java | 6 +++-
.../paimon/flink/source/BaseDataTableSource.java | 16 ++++++++++
.../lookup/LookupJoinBucketShuffleITCase.java | 34 ++++++++++++++++++++++
.../apache/paimon/flink/util/AbstractTestBase.java | 6 +++-
4 files changed, 60 insertions(+), 2 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
index 0fd7cf9f35..ffc01a1b0f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AdaptiveParallelism.java
@@ -27,7 +27,11 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class AdaptiveParallelism {
public static boolean isEnabled(StreamExecutionEnvironment env) {
- return
env.getConfiguration().get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED);
+ return isEnabled(env.getConfiguration());
+ }
+
+ public static boolean isEnabled(ReadableConfig config) {
+ return
config.get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED);
}
/**
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 5227fa0d27..943c4b582c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -27,6 +27,7 @@ import
org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
import org.apache.paimon.flink.lookup.partitioner.BucketShufflePartitioner;
import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
import org.apache.paimon.flink.lookup.partitioner.ShuffleStrategy;
+import org.apache.paimon.flink.sink.AdaptiveParallelism;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
@@ -40,6 +41,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -69,6 +71,8 @@ import java.util.OptionalLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
@@ -279,6 +283,18 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
int numBuckets;
ShuffleStrategy strategy = null;
if (useCustomShuffle) {
+ try {
+ checkArgument(
+ this.context
+ .getConfiguration()
+ .get(RUNTIME_MODE)
+ .equals(RuntimeExecutionMode.STREAMING)
+ ||
!AdaptiveParallelism.isEnabled(this.context.getConfiguration()),
+ "Custom shuffle lookup join is not supported in
adaptive parallelism mode.");
+ } catch (NoClassDefFoundError ignored) {
+ // before 1.17, there is no adaptive parallelism
+ }
+
numBuckets = table.store().options().bucket();
BucketIdExtractor extractor =
new BucketIdExtractor(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
index a4e485d829..4ba3f7d6bc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
@@ -19,7 +19,10 @@
package org.apache.paimon.flink.lookup;
import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.utils.BlockingIterator;
+import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
@@ -28,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for custom lookup shuffle. */
public class LookupJoinBucketShuffleITCase extends CatalogITCaseBase {
@@ -189,6 +193,36 @@ public class LookupJoinBucketShuffleITCase extends
CatalogITCaseBase {
testBucketNumberCases(query);
}
+ @Test
+ public void testBucketShuffleNotAllowedWithAQE() {
+ String nonPrimaryKeyDimTable = createNonPrimaryKeyDimTable("col1");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1")
+ .replace("DIM", nonPrimaryKeyDimTable);
+
+
tEnv.getConfig().set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED,
true);
+
+ if (isFlinkVersionGreaterThanOrEqualTo("2.0")) {
+ assertThatThrownBy(
+ () ->
+
BlockingIterator.of(tEnv.executeSql(query).collect())
+ .collect(ROW_NUMBER))
+ .rootCause()
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Custom shuffle lookup join is not supported in
adaptive parallelism mode.");
+ } else {
+ // Custom shuffle lookup join is not supported in Flink 1.x.
+ assertThatThrownBy(
+ () ->
+
BlockingIterator.of(tEnv.executeSql(query).collect())
+ .collect(ROW_NUMBER))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Unknown join strategy : LOOKUP");
+ }
+ }
+
private void testBucketNumberCases(String query) throws Exception {
List<Row> groundTruthRows = getGroundTruthRows();
// Test the case that bucket number = lookup parallelism.
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index e9fb0a55c6..ba0ca781e6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -56,8 +56,12 @@ import java.util.UUID;
public class AbstractTestBase {
protected static boolean isFlink2_1OrAbove() {
+ return isFlinkVersionGreaterThanOrEqualTo("2.1");
+ }
+
+ protected static boolean isFlinkVersionGreaterThanOrEqualTo(String
baseFlinkVersion) {
String flinkVersion = System.getProperty("test.flink.main.version");
- return flinkVersion.compareTo("2.1") >= 0;
+ return flinkVersion.compareTo(baseFlinkVersion) >= 0;
}
private static final int DEFAULT_PARALLELISM = 16;