This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4557599a9a [flink] fix compatibility issues of StreamExecutionEnv
(#7615)
4557599a9a is described below
commit 4557599a9ae6ea8e84365b66fb5f6f91882d85e8
Author: Faiz <[email protected]>
AuthorDate: Thu Apr 9 22:24:24 2026 +0800
[flink] fix compatibility issues of StreamExecutionEnv (#7615)
---
.../flink/action/RemoveUnexistingFilesAction.java | 4 ++-
.../paimon/flink/btree/BTreeIndexTopoBuilder.java | 6 +++-
.../flink/globalindex/GenericIndexTopoBuilder.java | 7 ++--
.../utils/StreamExecutionEnvironmentUtils.java | 41 ++++++++++++++++++++++
.../utils/StreamExecutionEnvironmentUtils.java | 40 +++++++++++++++++++++
5 files changed, 94 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
index c67ddd7ea3..5639da0871 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
@@ -108,7 +109,8 @@ public class RemoveUnexistingFilesAction extends
TableActionBase {
List<BinaryRow> binaryPartitions =
fileStoreTable.newScan().listPartitions();
SingleOutputStreamOperator<byte[]> source =
- env.fromData(
+ StreamExecutionEnvironmentUtils.fromData(
+ env,
binaryPartitions.stream()
.map(BinaryRow::toBytes)
.collect(Collectors.toList()),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index 35830faac1..dbdfdc8fe0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -34,6 +34,7 @@ import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
@@ -211,7 +212,10 @@ public class BTreeIndexTopoBuilder {
parallelism = Math.min(parallelism, maxParallelism);
DataStream<Split> sourceStream =
- env.fromData(new JavaTypeInfo<>(Split.class),
rangeSplits.toArray(new Split[0]))
+ StreamExecutionEnvironmentUtils.fromData(
+ env,
+ new JavaTypeInfo<>(Split.class),
+ rangeSplits.toArray(new Split[0]))
.name("Global Index Source " + " range=" + range)
.setParallelism(1);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index 865280763c..bcbabac793 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -28,6 +28,7 @@ import
org.apache.paimon.flink.sink.NoopCommittableStateManager;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.index.IndexFileMeta;
@@ -179,7 +180,8 @@ public class GenericIndexTopoBuilder {
ReadBuilder readBuilder =
table.newReadBuilder().withReadType(projectedRowType);
DataStream<ShardTask> source =
- env.fromData(
+ StreamExecutionEnvironmentUtils.fromData(
+ env,
new JavaTypeInfo<>(ShardTask.class),
shardTasks.toArray(new ShardTask[0]))
.name("Generic Index Source")
@@ -201,7 +203,8 @@ public class GenericIndexTopoBuilder {
if (!deletedIndexEntries.isEmpty()) {
List<Committable> deleteCommittables =
createDeleteCommittables(deletedIndexEntries);
DataStream<Committable> deletes =
- env.fromData(
+ StreamExecutionEnvironmentUtils.fromData(
+ env,
new CommittableTypeInfo(),
deleteCommittables.toArray(new
Committable[0]))
.name("Index Delete Source")
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
new file mode 100644
index 0000000000..c602bd3162
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Utility methods about {@link StreamExecutionEnvironment} to resolve
compatibility issues. */
+public class StreamExecutionEnvironmentUtils {
+
+ @SafeVarargs
+ public static <T> DataStreamSource<T> fromData(
+ StreamExecutionEnvironment env, TypeInformation<T> typeInfo, T...
data) {
+ return env.fromCollection(Arrays.asList(data), typeInfo);
+ }
+
+ public static <T> DataStreamSource<T> fromData(
+ StreamExecutionEnvironment env, Collection<T> data,
TypeInformation<T> typeInfo) {
+ return env.fromCollection(data, typeInfo);
+ }
+}
diff --git
a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
new file mode 100644
index 0000000000..1fb5edf496
--- /dev/null
+++
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Collection;
+
+/** Utility methods about {@link StreamExecutionEnvironment} to resolve
compatibility issues. */
+public class StreamExecutionEnvironmentUtils {
+
+ @SafeVarargs
+ public static <T> DataStreamSource<T> fromData(
+ StreamExecutionEnvironment env, TypeInformation<T> typeInfo, T...
data) {
+ return env.fromData(typeInfo, data);
+ }
+
+ public static <T> DataStreamSource<T> fromData(
+ StreamExecutionEnvironment env, Collection<T> data,
TypeInformation<T> typeInfo) {
+ return env.fromData(data, typeInfo);
+ }
+}