This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 0c1ed06054a branch-3.1: [enhance](hudi) upgrade hudi to version 1.0.2 
#54083 (#54277)
0c1ed06054a is described below

commit 0c1ed06054aa5aad7ad09ca57fbce1c5f5e98fe0
Author: Socrates <[email protected]>
AuthorDate: Tue Aug 5 16:55:37 2025 +0800

    branch-3.1: [enhance](hudi) upgrade hudi to version 1.0.2 #54083 (#54277)
    
    bp: #54083
---
 fe/be-java-extensions/hadoop-hudi-scanner/pom.xml  |   1 -
 .../datasource/hudi/HudiSchemaCacheValue.java      |   2 +-
 .../apache/doris/datasource/hudi/HudiUtils.java    |  10 +-
 .../hudi/source/COWIncrementalRelation.java        |  22 +--
 .../hudi/source/HudiCachedFsViewProcessor.java     |   7 +-
 .../hudi/source/HudiCachedPartitionProcessor.java  |   4 +-
 .../hudi/source/HudiLocalEngineContext.java        | 175 ---------------------
 .../hudi/source/HudiPartitionProcessor.java        |   5 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |   6 +-
 .../hudi/source/MORIncrementalRelation.java        |  18 +--
 .../tablefunction/HudiTableValuedFunction.java     |   1 -
 .../doris/tablefunction/MetadataGenerator.java     |   5 +-
 .../doris/datasource/hudi/HudiUtilsTest.java       |  97 ------------
 fe/pom.xml                                         |   2 +-
 .../data/external_table_p2/hudi/test_hudi_meta.out | Bin 4093 -> 2823 bytes
 15 files changed, 41 insertions(+), 314 deletions(-)

diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml 
b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
index 205e21155a4..e3871844be1 100644
--- a/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
+++ b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml
@@ -32,7 +32,6 @@ under the License.
     <properties>
         <doris.home>${basedir}/../../</doris.home>
         <fe_ut_parallel>1</fe_ut_parallel>
-        <hudi.version>0.15.0</hudi.version>
         <luben.zstd.jni.version>1.5.4-2</luben.zstd.jni.version>
         <hive-apache.version>3.1.2-22</hive-apache.version>
     </properties>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
index 5eef275fe47..b0b39f4b85e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java
@@ -45,7 +45,7 @@ public class HudiSchemaCacheValue extends HMSSchemaCacheValue 
{
     }
 
     public InternalSchema getCommitInstantInternalSchema(HoodieTableMetaClient 
metaClient, Long commitInstantTime) {
-        return InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
metaClient, true);
+        return InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
metaClient);
     }
 
     public boolean isEnableSchemaEvolution() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
index de670cbf262..eaa4f35cbca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
@@ -50,7 +50,6 @@ import org.apache.avro.Schema.Field;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -73,7 +72,8 @@ public class HudiUtils {
 
     /**
      * Convert different query instant time format to the commit time format.
-     * Currently we support three kinds of instant time format for time travel 
query:
+     * Currently we support three kinds of instant time format for time travel
+     * query:
      * 1、yyyy-MM-dd HH:mm:ss
      * 2、yyyy-MM-dd
      * This will convert to 'yyyyMMdd000000'.
@@ -88,11 +88,11 @@ public class HudiUtils {
             return 
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant);
         } else if (instantLength == 
HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
                 || instantLength == 
HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for 
yyyyMMddHHmmss[SSS]
-            HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); // 
validate the format
+            HoodieInstantTimeGenerator.parseDateFromInstantTime(queryInstant); 
// validate the format
             return queryInstant;
         } else if (instantLength == 10) { // for yyyy-MM-dd
             LocalDate date = LocalDate.parse(queryInstant, 
DEFAULT_DATE_FORMATTER);
-            return 
HoodieActiveTimeline.formatDate(java.sql.Date.valueOf(date));
+            return 
HoodieInstantTimeGenerator.formatDate(java.sql.Date.valueOf(date));
         } else {
             throw new IllegalArgumentException("Unsupported query instant time 
format: " + queryInstant
                     + ", Supported time format are: 'yyyy-MM-dd 
HH:mm:ss[.SSS]' "
@@ -308,7 +308,7 @@ public class HudiUtils {
         if (!snapshotInstant.isPresent()) {
             return 0L;
         }
-        return Long.parseLong(snapshotInstant.get().getTimestamp());
+        return Long.parseLong(snapshotInstant.get().requestedTime());
     }
 
     public static TablePartitionValues 
getPartitionValues(Optional<TableSnapshot> tableSnapshot,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
index 7f8b0a216c3..3a72c1bacab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
@@ -92,14 +92,14 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
         }
         String endInstantTime = 
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
                 hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME
-                        ? 
commitTimeline.lastInstant().get().getStateTransitionTime()
-                        : commitTimeline.lastInstant().get().getTimestamp());
+                        ? 
commitTimeline.lastInstant().get().getCompletionTime()
+                        : commitTimeline.lastInstant().get().requestedTime());
         startInstantArchived = 
commitTimeline.isBeforeTimelineStarts(startInstantTime);
         endInstantArchived = 
commitTimeline.isBeforeTimelineStarts(endInstantTime);
 
         HoodieTimeline commitsTimelineToReturn;
         if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
-            commitsTimelineToReturn = 
commitTimeline.findInstantsInRangeByStateTransitionTime(startInstantTime,
+            commitsTimelineToReturn = 
commitTimeline.findInstantsInRangeByCompletionTime(startInstantTime,
                     endInstantTime);
         } else {
             commitsTimelineToReturn = 
commitTimeline.findInstantsInRange(startInstantTime, endInstantTime);
@@ -107,28 +107,28 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
         List<HoodieInstant> commitsToReturn = 
commitsTimelineToReturn.getInstants();
 
         // todo: support configuration hoodie.datasource.read.incr.filters
-        StoragePath basePath = metaClient.getBasePathV2();
+        StoragePath basePath = metaClient.getBasePath();
         Map<String, String> regularFileIdToFullPath = new HashMap<>();
         Map<String, String> metaBootstrapFileIdToFullPath = new HashMap<>();
         HoodieTimeline replacedTimeline = 
commitsTimelineToReturn.getCompletedReplaceTimeline();
         Map<String, String> replacedFile = new HashMap<>();
         for (HoodieInstant instant : replacedTimeline.getInstants()) {
-            
HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
-                    
HoodieReplaceCommitMetadata.class).getPartitionToReplaceFileIds().forEach(
+            HoodieReplaceCommitMetadata metadata = 
metaClient.getActiveTimeline()
+                    .readReplaceCommitMetadata(instant);
+            metadata.getPartitionToReplaceFileIds().forEach(
                             (key, value) -> value.forEach(
                                     e -> replacedFile.put(e, 
FSUtils.constructAbsolutePath(basePath, key).toString())));
         }
 
         fileToWriteStat = new HashMap<>();
         for (HoodieInstant commit : commitsToReturn) {
-            HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
-                    commitTimeline.getInstantDetails(commit).get(), 
HoodieCommitMetadata.class);
+            HoodieCommitMetadata metadata = 
metaClient.getActiveTimeline().readCommitMetadata(commit);
             metadata.getPartitionToWriteStats().forEach((partition, stats) -> {
                 for (HoodieWriteStat stat : stats) {
                     
fileToWriteStat.put(FSUtils.constructAbsolutePath(basePath, 
stat.getPath()).toString(), stat);
                 }
             });
-            if 
(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.getTimestamp())) {
+            if 
(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.requestedTime())) {
                 metadata.getFileIdAndFullPaths(basePath).forEach((k, v) -> {
                     if (!(replacedFile.containsKey(k) && 
v.startsWith(replacedFile.get(k)))) {
                         metaBootstrapFileIdToFullPath.put(k, v);
@@ -167,8 +167,8 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
             startTs = startInstantTime;
             endTs = endInstantTime;
         } else {
-            startTs = commitsToReturn.get(0).getTimestamp();
-            endTs = commitsToReturn.get(commitsToReturn.size() - 
1).getTimestamp();
+            startTs = commitsToReturn.get(0).requestedTime();
+            endTs = commitsToReturn.get(commitsToReturn.size() - 
1).requestedTime();
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
index 5c93f1650f9..b906c6a1dd2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedFsViewProcessor.java
@@ -25,6 +25,7 @@ import org.apache.doris.datasource.ExternalTable;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.Maps;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -52,7 +53,7 @@ public class HudiCachedFsViewProcessor {
 
     private HoodieTableFileSystemView createFsView(FsViewKey key) {
         HoodieMetadataConfig metadataConfig = 
HoodieMetadataConfig.newBuilder().build();
-        HudiLocalEngineContext ctx = new 
HudiLocalEngineContext(key.getClient().getStorageConf());
+        HoodieLocalEngineContext ctx = new 
HoodieLocalEngineContext(key.getClient().getStorageConf());
         return FileSystemViewManager.createInMemoryFileSystemView(ctx, 
key.getClient(), metadataConfig);
     }
 
@@ -117,12 +118,12 @@ public class HudiCachedFsViewProcessor {
             }
             FsViewKey fsViewKey = (FsViewKey) o;
             return Objects.equals(dbName, fsViewKey.dbName) && 
Objects.equals(tbName, fsViewKey.tbName)
-                && Objects.equals(client.getBasePathV2(), 
fsViewKey.client.getBasePathV2());
+                    && Objects.equals(client.getBasePath(), 
fsViewKey.client.getBasePath());
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(dbName, tbName, client.getBasePathV2());
+            return Objects.hash(dbName, tbName, client.getBasePath());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
index 6356698c067..40db60eb78b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
@@ -99,7 +99,7 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
         if (!lastInstant.isPresent()) {
             return partitionValues;
         }
-        long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
+        long lastTimestamp = Long.parseLong(lastInstant.get().requestedTime());
         if (Long.parseLong(timestamp) == lastTimestamp) {
             return getPartitionValues(table, tableMetaClient, 
useHiveSyncPartition);
         }
@@ -130,7 +130,7 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
             return partitionValues;
         }
         try {
-            long lastTimestamp = 
Long.parseLong(lastInstant.get().getTimestamp());
+            long lastTimestamp = 
Long.parseLong(lastInstant.get().requestedTime());
             partitionValues = partitionCache.get(
                     new TablePartitionKey(table.getDbName(), table.getName(),
                             table.getHudiPartitionColumnTypes(lastTimestamp)));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
deleted file mode 100644
index fecc026cf8d..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
+++ /dev/null
@@ -1,175 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.hudi.source;
-
-import org.apache.hudi.common.data.HoodieAccumulator;
-import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
-import org.apache.hudi.common.data.HoodieListData;
-import org.apache.hudi.common.engine.EngineProperty;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.engine.LocalTaskContextSupplier;
-import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.function.FunctionWrapper;
-import org.apache.hudi.common.function.SerializableBiFunction;
-import org.apache.hudi.common.function.SerializableConsumer;
-import org.apache.hudi.common.function.SerializableFunction;
-import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
-import org.apache.hudi.common.function.SerializablePairFunction;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * This file is copied from
- * org.apache.hudi.common.engine.HudiLocalEngineContext.
- * Because we need set ugi in thread pool
- * A java based engine context, use this implementation on the query engine
- * integrations if needed.
- */
-public final class HudiLocalEngineContext extends HoodieEngineContext {
-
-    public HudiLocalEngineContext(StorageConfiguration<?> conf) {
-        this(conf, new LocalTaskContextSupplier());
-    }
-
-    public HudiLocalEngineContext(StorageConfiguration<?> conf, 
TaskContextSupplier taskContextSupplier) {
-        super(conf, taskContextSupplier);
-    }
-
-    @Override
-    public HoodieAccumulator newAccumulator() {
-        return HoodieAtomicLongAccumulator.create();
-    }
-
-    @Override
-    public <T> HoodieData<T> emptyHoodieData() {
-        return HoodieListData.eager(Collections.emptyList());
-    }
-
-    @Override
-    public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
-        return HoodieListData.eager(data);
-    }
-
-    @Override
-    public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, 
int parallelism) {
-        return 
data.stream().parallel().map(FunctionWrapper.throwingMapWrapper(func)).collect(Collectors.toList());
-    }
-
-    @Override
-    public <I, K, V> List<V> mapToPairAndReduceByKey(
-            List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, 
SerializableBiFunction<V, V, V> reduceFunc,
-            int parallelism) {
-        return 
data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc))
-                .collect(Collectors.groupingBy(p -> 
p.getKey())).values().stream()
-                .map(list -> list.stream().map(e -> e.getValue())
-                        
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get())
-                .collect(Collectors.toList());
-    }
-
-    @Override
-    public <I, K, V> Stream<ImmutablePair<K, V>> 
mapPartitionsToPairAndReduceByKey(
-            Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> 
flatMapToPairFunc,
-            SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
-        return 
FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
-                
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
-                .map(entry -> new ImmutablePair<>(entry.getKey(), 
entry.getValue().stream().map(
-                        
Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
-                .filter(Objects::nonNull);
-    }
-
-    @Override
-    public <I, K, V> List<V> reduceByKey(
-            List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, 
int parallelism) {
-        return data.stream().parallel()
-                .collect(Collectors.groupingBy(p -> 
p.getKey())).values().stream()
-                .map(list -> list.stream().map(e -> e.getValue())
-                        
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc))
-                        .orElse(null))
-                .filter(Objects::nonNull)
-                .collect(Collectors.toList());
-    }
-
-    @Override
-    public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, 
Stream<O>> func, int parallelism) {
-        return 
data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func))
-                .collect(Collectors.toList());
-    }
-
-    @Override
-    public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, 
int parallelism) {
-        
data.stream().forEach(FunctionWrapper.throwingForeachWrapper(consumer));
-    }
-
-    @Override
-    public <I, K, V> Map<K, V> mapToPair(List<I> data, 
SerializablePairFunction<I, K, V> func, Integer parallelism) {
-        return 
data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect(
-                Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, 
newVal) -> newVal));
-    }
-
-    @Override
-    public void setProperty(EngineProperty key, String value) {
-        // no operation for now
-    }
-
-    @Override
-    public Option<String> getProperty(EngineProperty key) {
-        return Option.empty();
-    }
-
-    @Override
-    public void setJobStatus(String activeModule, String activityDescription) {
-        // no operation for now
-    }
-
-    @Override
-    public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
-        // no operation for now
-    }
-
-    @Override
-    public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public void cancelJob(String jobId) {
-        // no operation for now
-    }
-
-    @Override
-    public void cancelAllJobs() {
-        // no operation for now
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
index b1e5bd4a82d..ae127f9e1c4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
@@ -21,6 +21,7 @@ import org.apache.doris.datasource.ExternalTable;
 
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -51,9 +52,9 @@ public abstract class HudiPartitionProcessor {
                 .build();
 
         HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
-                new HudiLocalEngineContext(tableMetaClient.getStorageConf()), 
tableMetaClient.getStorage(),
+                new 
HoodieLocalEngineContext(tableMetaClient.getStorageConf()), 
tableMetaClient.getStorage(),
                 metadataConfig,
-                tableMetaClient.getBasePathV2().toString(), true);
+                tableMetaClient.getBasePath().toString(), true);
 
         return newTableMetadata.getAllPartitionPaths();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 948e1f8c5e2..7748bd33ce8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -210,7 +210,7 @@ public class HudiScanNode extends HiveScanNode {
                 partitionInit = true;
                 return;
             }
-            queryInstant = snapshotInstant.get().getTimestamp();
+            queryInstant = snapshotInstant.get().requestedTime();
         }
 
         HudiSchemaCacheValue hudiSchemaCacheValue = 
HudiUtils.getSchemaCacheValue(hmsTable, queryInstant);
@@ -322,7 +322,7 @@ public class HudiScanNode extends HiveScanNode {
             this.selectedPartitionNum = prunedPartitions.size();
 
             String inputFormat = 
hmsTable.getRemoteTable().getSd().getInputFormat();
-            String basePath = metaClient.getBasePathV2().toString();
+            String basePath = metaClient.getBasePath().toString();
 
             List<HivePartition> hivePartitions = Lists.newArrayList();
             prunedPartitions.forEach(
@@ -368,7 +368,7 @@ public class HudiScanNode extends HiveScanNode {
         if (partition.isDummyPartition()) {
             partitionName = "";
         } else {
-            partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
+            partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePath(),
                     new StoragePath(partition.getPath()));
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
index 7df01359922..69ca39e9ad6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
@@ -87,8 +87,8 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
         }
         endTimestamp = 
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
                 hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME
-                        ? timeline.lastInstant().get().getStateTransitionTime()
-                        : timeline.lastInstant().get().getTimestamp());
+                        ? timeline.lastInstant().get().getCompletionTime()
+                        : timeline.lastInstant().get().requestedTime());
 
         startInstantArchived = timeline.isBeforeTimelineStarts(startTimestamp);
         endInstantArchived = timeline.isBeforeTimelineStarts(endTimestamp);
@@ -96,7 +96,7 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
         includedCommits = getIncludedCommits();
         commitsMetadata = getCommitsMetadata();
         affectedFilesInCommits = 
HoodieInputFormatUtils.listAffectedFilesForCommits(configuration,
-                metaClient.getBasePathV2(), commitsMetadata);
+                metaClient.getBasePath(), commitsMetadata);
         fullTableScan = shouldFullTableScan();
         if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME 
&& fullTableScan) {
             throw new HoodieException("Cannot use stateTransitionTime while 
enables full table scan");
@@ -108,10 +108,10 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
             startTs = startTimestamp;
         } else {
             includeStartTime = true;
-            startTs = includedCommits.isEmpty() ? startTimestamp : 
includedCommits.get(0).getTimestamp();
+            startTs = includedCommits.isEmpty() ? startTimestamp : 
includedCommits.get(0).requestedTime();
         }
         endTs = endInstantArchived || includedCommits.isEmpty() ? endTimestamp
-                : includedCommits.get(includedCommits.size() - 
1).getTimestamp();
+                : includedCommits.get(includedCommits.size() - 
1).requestedTime();
     }
 
     @Override
@@ -128,7 +128,7 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
             // If endTimestamp commit is not archived, will filter instants
             // before endTimestamp.
             if (hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME) {
-                return 
timeline.findInstantsInRangeByStateTransitionTime(startTimestamp, 
endTimestamp).getInstants();
+                return 
timeline.findInstantsInRangeByCompletionTime(startTimestamp, 
endTimestamp).getInstants();
             } else {
                 return timeline.findInstantsInRange(startTimestamp, 
endTimestamp).getInstants();
             }
@@ -153,7 +153,7 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
             return true;
         }
         for (StoragePathInfo fileStatus : affectedFilesInCommits) {
-            if 
(!metaClient.getRawHoodieStorage().exists(fileStatus.getPath())) {
+            if (!metaClient.getStorage().exists(fileStatus.getPath())) {
                 return true;
             }
         }
@@ -190,13 +190,13 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
         HoodieTimeline scanTimeline;
         if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
             scanTimeline = metaClient.getCommitsAndCompactionTimeline()
-                    .findInstantsInRangeByStateTransitionTime(startTimestamp, 
endTimestamp);
+                    .findInstantsInRangeByCompletionTime(startTimestamp, 
endTimestamp);
         } else {
             scanTimeline = TimelineUtils.handleHollowCommitIfNeeded(
                             metaClient.getCommitsAndCompactionTimeline(), 
metaClient, hollowCommitHandling)
                     .findInstantsInRange(startTimestamp, endTimestamp);
         }
-        String latestCommit = includedCommits.get(includedCommits.size() - 
1).getTimestamp();
+        String latestCommit = includedCommits.get(includedCommits.size() - 
1).requestedTime();
         HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, scanTimeline,
                 affectedFilesInCommits);
         Stream<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getWritePartitionPaths(commitsMetadata)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
index b1d6c82329a..70e1ec84928 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
@@ -55,7 +55,6 @@ public class HudiTableValuedFunction extends 
MetadataTableValuedFunction {
     private static final ImmutableList<Column> SCHEMA_TIMELINE = 
ImmutableList.of(
             new Column("timestamp", PrimitiveType.STRING, false),
             new Column("action", PrimitiveType.STRING, false),
-            new Column("file_name", PrimitiveType.STRING, false),
             new Column("state", PrimitiveType.STRING, false),
             new Column("state_transition_time", PrimitiveType.STRING, false));
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index e29c6bf10ee..9430032b7bb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -370,11 +370,10 @@ public class MetadataGenerator {
                         hudiBasePathString, conf).getActiveTimeline();
                 for (HoodieInstant instant : timeline.getInstants()) {
                     TRow trow = new TRow();
-                    trow.addToColumnValue(new 
TCell().setStringVal(instant.getTimestamp()));
+                    trow.addToColumnValue(new 
TCell().setStringVal(instant.requestedTime()));
                     trow.addToColumnValue(new 
TCell().setStringVal(instant.getAction()));
-                    trow.addToColumnValue(new 
TCell().setStringVal(instant.getFileName()));
                     trow.addToColumnValue(new 
TCell().setStringVal(instant.getState().name()));
-                    trow.addToColumnValue(new 
TCell().setStringVal(instant.getStateTransitionTime()));
+                    trow.addToColumnValue(new 
TCell().setStringVal(instant.getCompletionTime()));
                     dataBatch.add(trow);
                 }
                 break;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
index 409fc1daf72..070cd2f1859 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
@@ -26,8 +26,6 @@ import mockit.Mock;
 import mockit.MockUp;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -197,99 +195,4 @@ public class HudiUtilsTest {
         Assert.assertTrue(meta.delete());
         Files.delete(hudiTable);
     }
-
-    @Test
-    public void testFormatQueryInstantThreadSafety() throws Exception {
-        // Mock HoodieActiveTimeline and HoodieInstantTimeGenerator methods
-        new MockUp<HoodieInstantTimeGenerator>() {
-            @Mock
-            public String getInstantForDateString(String dateString) {
-                return "mocked_" + dateString.replace(" ", "_").replace(":", 
"_").replace(".", "_");
-            }
-        };
-
-        new MockUp<HoodieActiveTimeline>() {
-            @Mock
-            public void parseDateFromInstantTime(String instantTime) {
-                // Just a validation method, no return value needed
-            }
-
-            @Mock
-            public String formatDate(java.util.Date date) {
-                return "formatted_" + date.getTime();
-            }
-        };
-
-        // Test different date formats
-        String[] dateFormats = {
-                "2023-01-15",                 // yyyy-MM-dd format
-                "2023-01-15 14:30:25",        // yyyy-MM-dd HH:mm:ss format
-                "2023-01-15 14:30:25.123",    // yyyy-MM-dd HH:mm:ss.SSS format
-                "20230115143025",             // yyyyMMddHHmmss format
-                "20230115143025123"           // yyyyMMddHHmmssSSS format
-        };
-
-        // Single thread test for basic functionality
-        for (String dateFormat : dateFormats) {
-            String result = HudiUtils.formatQueryInstant(dateFormat);
-            Assert.assertNotNull(result);
-
-            // Verify expected format based on input length
-            if (dateFormat.length() == 10) { // yyyy-MM-dd
-                Assert.assertTrue(result.startsWith("formatted_"));
-            } else if (dateFormat.length() == 19 || dateFormat.length() == 23) 
{ // yyyy-MM-dd HH:mm:ss[.SSS]
-                Assert.assertTrue(result.startsWith("mocked_"));
-            } else {
-                // yyyyMMddHHmmss[SSS] passes through
-                Assert.assertEquals(dateFormat, result);
-            }
-        }
-
-        // Multi-thread test for thread safety
-        int threadCount = 10;
-        int iterationsPerThread = 100;
-
-        Thread[] threads = new Thread[threadCount];
-        Exception[] threadExceptions = new Exception[threadCount];
-
-        // Create a map to store expected results for each date format
-        final java.util.Map<String, String> expectedResults = new 
java.util.HashMap<>();
-        for (String dateFormat : dateFormats) {
-            expectedResults.put(dateFormat, 
HudiUtils.formatQueryInstant(dateFormat));
-        }
-
-        for (int i = 0; i < threadCount; i++) {
-            final int threadId = i;
-            threads[i] = new Thread(() -> {
-                try {
-                    for (int j = 0; j < iterationsPerThread; j++) {
-                        // Each thread cycles through all date formats
-                        String dateFormat = dateFormats[j % 
dateFormats.length];
-                        String result = 
HudiUtils.formatQueryInstant(dateFormat);
-
-                        // Verify the result matches the expected value for 
this date format
-                        String expected = expectedResults.get(dateFormat);
-                        Assert.assertEquals("Thread " + threadId + " iteration 
" + j
-                                        + " got incorrect result for format " 
+ dateFormat,
-                                expected, result);
-                    }
-                } catch (Exception e) {
-                    threadExceptions[threadId] = e;
-                }
-            });
-            threads[i].start();
-        }
-
-        // Wait for all threads to complete
-        for (Thread thread : threads) {
-            thread.join(5000); // Timeout after 5 seconds to ensure test 
doesn't run too long
-        }
-
-        // Check if any thread encountered exceptions
-        for (int i = 0; i < threadCount; i++) {
-            if (threadExceptions[i] != null) {
-                throw new AssertionError("Thread " + i + " failed with 
exception", threadExceptions[i]);
-            }
-        }
-    }
 }
diff --git a/fe/pom.xml b/fe/pom.xml
index a62110f6eff..238226f184d 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -226,7 +226,7 @@ under the License.
         <avro.version>1.12.0</avro.version>
        <parquet.version>1.15.2</parquet.version>
         <spark.version>3.4.3</spark.version>
-        <hudi.version>0.15.0</hudi.version>
+        <hudi.version>1.0.2</hudi.version>
         <obs.dependency.scope>compile</obs.dependency.scope>
         <cos.dependency.scope>compile</cos.dependency.scope>
         <gcs.dependency.scope>compile</gcs.dependency.scope>
diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_meta.out 
b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out
index 0c312efd3c0..af38b0e31fd 100644
Binary files a/regression-test/data/external_table_p2/hudi/test_hudi_meta.out 
and b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out differ


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to