This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.3 by this push:
new 17a7f415e0 [fileio] Delegate create()/getDefault() in
HadoopSecuredFileSystem (#7636)
17a7f415e0 is described below
commit 17a7f415e00782f3d2019830b36b82a60e9829c5
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Tue Apr 14 09:53:33 2026 +0800
[fileio] Delegate create()/getDefault() in HadoopSecuredFileSystem (#7636)
This PR cherry-picks the commit in #6602 and commit
https://github.com/apache/paimon/commit/6659fb70ecda5268e59138d1117388147af2b842
to release-1.3.
---
.../paimon/fs/hadoop/HadoopSecuredFileSystem.java | 35 ++++++++++++++++++++++
.../apache/paimon/flink/DockerImageVersions.java | 26 ++++++++++++++++
.../action/cdc/kafka/KafkaActionITCaseBase.java | 2 +-
.../paimon/flink/kafka/KafkaTableTestBase.java | 2 +-
4 files changed, 63 insertions(+), 2 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java
index 7d0b402383..d229579ce5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystem.java
@@ -88,6 +88,41 @@ public class HadoopSecuredFileSystem extends FileSystem {
progressable));
}
+ @Override
+ public FSDataOutputStream create(Path path, boolean overwrite) throws
IOException {
+ return runSecuredWithIOException(() -> fileSystem.create(path,
overwrite));
+ }
+
+ @Override
+ public FSDataOutputStream create(
+ Path path, boolean overwrite, int bufferSize, short replication,
long blockSize)
+ throws IOException {
+ return runSecuredWithIOException(
+ () -> fileSystem.create(path, overwrite, bufferSize,
replication, blockSize));
+ }
+
+ @Override
+ public short getDefaultReplication(Path f) {
+ return runSecured(() -> fileSystem.getDefaultReplication(f));
+ }
+
+ @Deprecated
+ @Override
+ public short getDefaultReplication() {
+ return runSecured(() -> fileSystem.getDefaultReplication());
+ }
+
+ @Override
+ public long getDefaultBlockSize(Path f) {
+ return runSecured(() -> fileSystem.getDefaultBlockSize(f));
+ }
+
+ @Deprecated
+ @Override
+ public long getDefaultBlockSize() {
+ return runSecured(() -> fileSystem.getDefaultBlockSize());
+ }
+
@Override
public boolean exists(Path f) throws IOException {
return runSecuredWithIOException(() -> fileSystem.exists(f));
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java
new file mode 100644
index 0000000000..15dcc78aa9
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/DockerImageVersions.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+/** Docker image versions. */
+public class DockerImageVersions {
+
+ public static final String KAFKA = "confluentinc/cp-kafka:7.9.2";
+ public static final String SCHEMA_REGISTRY =
"confluentinc/cp-schema-registry:7.9.2";
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index ee5a653521..f36dd2bf87 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -18,13 +18,13 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.flink.DockerImageVersions;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.utils.StringUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.util.DockerImageVersions;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
index eddb8ba897..2b21c0a9e0 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
@@ -18,12 +18,12 @@
package org.apache.paimon.flink.kafka;
+import org.apache.paimon.flink.DockerImageVersions;
import org.apache.paimon.flink.util.AbstractTestBase;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.util.DockerImageVersions;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;