This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit d3589fa11b18d73b88fa4f01b5f40b21dc647713
Author: Faiz <[email protected]>
AuthorDate: Thu Apr 9 22:24:24 2026 +0800

    [flink] fix compatibility issues of StreamExecutionEnv (#7615)
---
 .../flink/action/RemoveUnexistingFilesAction.java  |  4 ++-
 .../paimon/flink/btree/BTreeIndexTopoBuilder.java  |  6 +++-
 .../flink/globalindex/GenericIndexTopoBuilder.java |  7 ++--
 .../utils/StreamExecutionEnvironmentUtils.java     | 41 ++++++++++++++++++++++
 .../utils/StreamExecutionEnvironmentUtils.java     | 40 +++++++++++++++++++++
 5 files changed, 94 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
index c67ddd7ea3..5639da0871 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.CommittableTypeInfo;
 import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
@@ -108,7 +109,8 @@ public class RemoveUnexistingFilesAction extends 
TableActionBase {
         List<BinaryRow> binaryPartitions = 
fileStoreTable.newScan().listPartitions();
 
         SingleOutputStreamOperator<byte[]> source =
-                env.fromData(
+                StreamExecutionEnvironmentUtils.fromData(
+                                env,
                                 binaryPartitions.stream()
                                         .map(BinaryRow::toBytes)
                                         .collect(Collectors.toList()),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index 35830faac1..dbdfdc8fe0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -34,6 +34,7 @@ import org.apache.paimon.flink.sorter.TableSortInfo;
 import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.flink.utils.BoundedOneInputOperator;
 import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
 import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
 import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
@@ -211,7 +212,10 @@ public class BTreeIndexTopoBuilder {
         parallelism = Math.min(parallelism, maxParallelism);
 
         DataStream<Split> sourceStream =
-                env.fromData(new JavaTypeInfo<>(Split.class), 
rangeSplits.toArray(new Split[0]))
+                StreamExecutionEnvironmentUtils.fromData(
+                                env,
+                                new JavaTypeInfo<>(Split.class),
+                                rangeSplits.toArray(new Split[0]))
                         .name("Global Index Source " + " range=" + range)
                         .setParallelism(1);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index 865280763c..bcbabac793 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -28,6 +28,7 @@ import 
org.apache.paimon.flink.sink.NoopCommittableStateManager;
 import org.apache.paimon.flink.sink.StoreCommitter;
 import org.apache.paimon.flink.utils.BoundedOneInputOperator;
 import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
 import org.apache.paimon.globalindex.ResultEntry;
 import org.apache.paimon.index.IndexFileMeta;
@@ -179,7 +180,8 @@ public class GenericIndexTopoBuilder {
         ReadBuilder readBuilder = 
table.newReadBuilder().withReadType(projectedRowType);
 
         DataStream<ShardTask> source =
-                env.fromData(
+                StreamExecutionEnvironmentUtils.fromData(
+                                env,
                                 new JavaTypeInfo<>(ShardTask.class),
                                 shardTasks.toArray(new ShardTask[0]))
                         .name("Generic Index Source")
@@ -201,7 +203,8 @@ public class GenericIndexTopoBuilder {
         if (!deletedIndexEntries.isEmpty()) {
             List<Committable> deleteCommittables = 
createDeleteCommittables(deletedIndexEntries);
             DataStream<Committable> deletes =
-                    env.fromData(
+                    StreamExecutionEnvironmentUtils.fromData(
+                                    env,
                                     new CommittableTypeInfo(),
                                     deleteCommittables.toArray(new 
Committable[0]))
                             .name("Index Delete Source")
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
new file mode 100644
index 0000000000..c602bd3162
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Utility methods about {@link StreamExecutionEnvironment} to resolve 
compatibility issues. */
+public class StreamExecutionEnvironmentUtils {
+
+    @SafeVarargs
+    public static <T> DataStreamSource<T> fromData(
+            StreamExecutionEnvironment env, TypeInformation<T> typeInfo, T... 
data) {
+        return env.fromCollection(Arrays.asList(data), typeInfo);
+    }
+
+    public static <T> DataStreamSource<T> fromData(
+            StreamExecutionEnvironment env, Collection<T> data, 
TypeInformation<T> typeInfo) {
+        return env.fromCollection(data, typeInfo);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
new file mode 100644
index 0000000000..1fb5edf496
--- /dev/null
+++ 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Collection;
+
+/** Utility methods about {@link StreamExecutionEnvironment} to resolve 
compatibility issues. */
+public class StreamExecutionEnvironmentUtils {
+
+    @SafeVarargs
+    public static <T> DataStreamSource<T> fromData(
+            StreamExecutionEnvironment env, TypeInformation<T> typeInfo, T... 
data) {
+        return env.fromData(typeInfo, data);
+    }
+
+    public static <T> DataStreamSource<T> fromData(
+            StreamExecutionEnvironment env, Collection<T> data, 
TypeInformation<T> typeInfo) {
+        return env.fromData(data, typeInfo);
+    }
+}

Reply via email to