This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit d3589fa11b18d73b88fa4f01b5f40b21dc647713 Author: Faiz <[email protected]> AuthorDate: Thu Apr 9 22:24:24 2026 +0800 [flink] fix compatibility issues of StreamExecutionEnv (#7615) --- .../flink/action/RemoveUnexistingFilesAction.java | 4 ++- .../paimon/flink/btree/BTreeIndexTopoBuilder.java | 6 +++- .../flink/globalindex/GenericIndexTopoBuilder.java | 7 ++-- .../utils/StreamExecutionEnvironmentUtils.java | 41 ++++++++++++++++++++++ .../utils/StreamExecutionEnvironmentUtils.java | 40 +++++++++++++++++++++ 5 files changed, 94 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java index c67ddd7ea3..5639da0871 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.CommittableTypeInfo; import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -108,7 +109,8 @@ public class RemoveUnexistingFilesAction extends TableActionBase { List<BinaryRow> binaryPartitions = fileStoreTable.newScan().listPartitions(); SingleOutputStreamOperator<byte[]> source = - env.fromData( + StreamExecutionEnvironmentUtils.fromData( + env, binaryPartitions.stream() .map(BinaryRow::toBytes) .collect(Collectors.toList()), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 35830faac1..dbdfdc8fe0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -34,6 +34,7 @@ import org.apache.paimon.flink.sorter.TableSortInfo; import org.apache.paimon.flink.sorter.TableSorter; import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.globalindex.GlobalIndexParallelWriter; import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder; import org.apache.paimon.globalindex.btree.BTreeIndexOptions; @@ -211,7 +212,10 @@ public class BTreeIndexTopoBuilder { parallelism = Math.min(parallelism, maxParallelism); DataStream<Split> sourceStream = - env.fromData(new JavaTypeInfo<>(Split.class), rangeSplits.toArray(new Split[0])) + StreamExecutionEnvironmentUtils.fromData( + env, + new JavaTypeInfo<>(Split.class), + rangeSplits.toArray(new Split[0])) .name("Global Index Source " + " range=" + range) .setParallelism(1); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index 865280763c..bcbabac793 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -28,6 +28,7 @@ import org.apache.paimon.flink.sink.NoopCommittableStateManager; import org.apache.paimon.flink.sink.StoreCommitter; import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.IndexFileMeta; @@ -179,7 +180,8 @@ public class GenericIndexTopoBuilder { ReadBuilder readBuilder = table.newReadBuilder().withReadType(projectedRowType); DataStream<ShardTask> source = - env.fromData( + StreamExecutionEnvironmentUtils.fromData( + env, new JavaTypeInfo<>(ShardTask.class), shardTasks.toArray(new ShardTask[0])) .name("Generic Index Source") @@ -201,7 +203,8 @@ public class GenericIndexTopoBuilder { if (!deletedIndexEntries.isEmpty()) { List<Committable> deleteCommittables = createDeleteCommittables(deletedIndexEntries); DataStream<Committable> deletes = - env.fromData( + StreamExecutionEnvironmentUtils.fromData( + env, new CommittableTypeInfo(), deleteCommittables.toArray(new Committable[0])) .name("Index Delete Source") diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java new file mode 100644 index 0000000000..c602bd3162 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Arrays; +import java.util.Collection; + +/** Utility methods about {@link StreamExecutionEnvironment} to resolve compatibility issues. */ +public class StreamExecutionEnvironmentUtils { + + @SafeVarargs + public static <T> DataStreamSource<T> fromData( + StreamExecutionEnvironment env, TypeInformation<T> typeInfo, T... data) { + return env.fromCollection(Arrays.asList(data), typeInfo); + } + + public static <T> DataStreamSource<T> fromData( + StreamExecutionEnvironment env, Collection<T> data, TypeInformation<T> typeInfo) { + return env.fromCollection(data, typeInfo); + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java new file mode 100644 index 0000000000..1fb5edf496 --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Collection; + +/** Utility methods about {@link StreamExecutionEnvironment} to resolve compatibility issues. */ +public class StreamExecutionEnvironmentUtils { + + @SafeVarargs + public static <T> DataStreamSource<T> fromData( + StreamExecutionEnvironment env, TypeInformation<T> typeInfo, T... data) { + return env.fromData(typeInfo, data); + } + + public static <T> DataStreamSource<T> fromData( + StreamExecutionEnvironment env, Collection<T> data, TypeInformation<T> typeInfo) { + return env.fromData(data, typeInfo); + } +}
