This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 446d2a0ca85 [bugfix](hudi)add timetravel for nereids (#38324)
446d2a0ca85 is described below
commit 446d2a0ca85c64968891727b4c349c6b2a82a8b3
Author: wuwenchi <[email protected]>
AuthorDate: Tue Jul 30 08:57:05 2024 +0800
[bugfix](hudi)add timetravel for nereids (#38324)
## Proposed changes
1. add timetravel for nereids.
```
select * from tb FOR TIME AS OF "2024-07-24 19:58:43";
select * from tb FOR TIME AS OF "20240724195843";
```
2. Add ugi authentication to the thread pool
---
.../apache/doris/datasource/FileQueryScanNode.java | 13 +-
.../hudi/source/HudiLocalEngineContext.java | 188 +++++++++++++++++++++
.../hudi/source/HudiPartitionProcessor.java | 3 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 9 +-
.../datasource/iceberg/source/IcebergScanNode.java | 9 +-
.../glue/translator/PhysicalPlanTranslator.java | 15 +-
.../java/org/apache/doris/planner/ScanNode.java | 3 +
.../hudi/test_hudi_timetravel.out | 125 ++++++++++++++
.../hudi/test_hudi_timetravel.groovy | 107 ++++++++++++
9 files changed, 451 insertions(+), 21 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index a5c5f501a1b..df3fbca56d4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -94,7 +94,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected String brokerName;
- @Getter
protected TableSnapshot tableSnapshot;
/**
@@ -581,4 +580,16 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
}
}
+
+ public void setQueryTableSnapshot(TableSnapshot tableSnapshot) {
+ this.tableSnapshot = tableSnapshot;
+ }
+
+ public TableSnapshot getQueryTableSnapshot() {
+ TableSnapshot snapshot = desc.getRef().getTableSnapshot();
+ if (snapshot != null) {
+ return snapshot;
+ }
+ return this.tableSnapshot;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
new file mode 100644
index 00000000000..26ef6fdfef7
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi.source;
+
+import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.engine.EngineProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.function.FunctionWrapper;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableConsumer;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This file is copied from
org.apache.hudi.common.engine.HoodieLocalEngineContext.
+ * Because we need set ugi in thread pool
+ * A java based engine context, use this implementation on the query engine
integrations if needed.
+ */
+public final class HudiLocalEngineContext extends HoodieEngineContext {
+
+ public HudiLocalEngineContext(Configuration conf) {
+ this(conf, new LocalTaskContextSupplier());
+ }
+
+ public HudiLocalEngineContext(Configuration conf, TaskContextSupplier
taskContextSupplier) {
+ super(new SerializableConfiguration(conf), taskContextSupplier);
+ }
+
+ @Override
+ public HoodieAccumulator newAccumulator() {
+ return HoodieAtomicLongAccumulator.create();
+ }
+
+ @Override
+ public <T> HoodieData<T> emptyHoodieData() {
+ return HoodieListData.eager(Collections.emptyList());
+ }
+
+ @Override
+ public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
+ return HoodieListData.eager(data);
+ }
+
+ @Override
+ public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func,
int parallelism) {
+ return data.stream().parallel().map(v1 -> {
+ try {
+ return
HiveMetaStoreClientHelper.ugiDoAs(getHadoopConf().get(), () -> func.apply(v1));
+ } catch (Exception e) {
+ throw new HoodieException("Error occurs when executing map",
e);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public <I, K, V> List<V> mapToPairAndReduceByKey(
+ List<I> data,
+ SerializablePairFunction<I, K, V> mapToPairFunc,
+ SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return
data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc))
+ .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+ .map(list ->
+ list.stream()
+ .map(e -> e.getValue())
+
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
+ Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
+ SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return
FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
+ .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+ .map(entry -> new ImmutablePair<>(entry.getKey(),
entry.getValue().stream().map(
+
Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
+ .filter(Objects::nonNull);
+ }
+
+ @Override
+ public <I, K, V> List<V> reduceByKey(
+ List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc,
int parallelism) {
+ return data.stream().parallel()
+ .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+ .map(list ->
+ list.stream()
+ .map(e -> e.getValue())
+
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
Stream<O>> func, int parallelism) {
+ return
+
data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)).collect(Collectors.toList());
+ }
+
+ @Override
+ public <I> void foreach(List<I> data, SerializableConsumer<I> consumer,
int parallelism) {
+
data.stream().forEach(FunctionWrapper.throwingForeachWrapper(consumer));
+ }
+
+ @Override
+ public <I, K, V> Map<K, V> mapToPair(List<I> data,
SerializablePairFunction<I, K, V> func, Integer parallelism) {
+ return
data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect(
+ Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal)
-> newVal)
+ );
+ }
+
+ @Override
+ public void setProperty(EngineProperty key, String value) {
+ // no operation for now
+ }
+
+ @Override
+ public Option<String> getProperty(EngineProperty key) {
+ return Option.empty();
+ }
+
+ @Override
+ public void setJobStatus(String activeModule, String activityDescription) {
+ // no operation for now
+ }
+
+ @Override
+ public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+ // no operation for now
+ }
+
+ @Override
+ public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void cancelJob(String jobId) {
+ // no operation for now
+ }
+
+ @Override
+ public void cancelAllJobs() {
+ // no operation for now
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
index 4baa1477041..738b2638588 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
@@ -18,7 +18,6 @@
package org.apache.doris.datasource.hudi.source;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -50,7 +49,7 @@ public abstract class HudiPartitionProcessor {
.build();
HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
- new HoodieLocalEngineContext(tableMetaClient.getHadoopConf()),
metadataConfig,
+ new HudiLocalEngineContext(tableMetaClient.getHadoopConf()),
metadataConfig,
tableMetaClient.getBasePathV2().toString(), true);
return newTableMetadata.getAllPartitionPaths();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 111f0877f3a..66c14446845 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -18,6 +18,7 @@
package org.apache.doris.datasource.hudi.source;
import org.apache.doris.analysis.TableScanParams;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
@@ -203,8 +204,12 @@ public class HudiScanNode extends HiveScanNode {
}
timeline =
hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
- if (desc.getRef().getTableSnapshot() != null) {
- queryInstant = desc.getRef().getTableSnapshot().getTime();
+ TableSnapshot tableSnapshot = getQueryTableSnapshot();
+ if (tableSnapshot != null) {
+ if (tableSnapshot.getType() == TableSnapshot.VersionType.VERSION) {
+ throw new UserException("Hudi does not support `FOR VERSION AS
OF`, please use `FOR TIME AS OF`");
+ }
+ queryInstant = tableSnapshot.getTime().replaceAll("[-: ]", "");
snapshotTimestamp = Option.of(queryInstant);
} else {
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index a79256f7f0d..f5110ac5bb0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -274,10 +274,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
public Long getSpecifiedSnapshot() throws UserException {
- TableSnapshot tableSnapshot =
source.getDesc().getRef().getTableSnapshot();
- if (tableSnapshot == null) {
- tableSnapshot = this.tableSnapshot;
- }
+ TableSnapshot tableSnapshot = getQueryTableSnapshot();
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
try {
@@ -440,8 +437,4 @@ public class IcebergScanNode extends FileQueryScanNode {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%sicebergPredicatePushdown=\n%s\n", prefix,
sb);
}
-
- public void setTableSnapshot(TableSnapshot tableSnapshot) {
- this.tableSnapshot = tableSnapshot;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index ff3de2249a6..2dfbe1dd0fa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -47,6 +47,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.es.source.EsScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
@@ -577,10 +578,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
switch (((HMSExternalTable) table).getDlaType()) {
case ICEBERG:
scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
- IcebergScanNode icebergScanNode = (IcebergScanNode)
scanNode;
- if (fileScan.getTableSnapshot().isPresent()) {
-
icebergScanNode.setTableSnapshot(fileScan.getTableSnapshot().get());
- }
break;
case HIVE:
scanNode = new HiveScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
@@ -596,9 +593,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
} else if (table instanceof IcebergExternalTable) {
scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
- if (fileScan.getTableSnapshot().isPresent()) {
- ((IcebergScanNode)
scanNode).setTableSnapshot(fileScan.getTableSnapshot().get());
- }
} else if (table instanceof PaimonExternalTable) {
scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
} else if (table instanceof TrinoConnectorExternalTable) {
@@ -610,6 +604,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
} else {
throw new RuntimeException("do not support table type " +
table.getType());
}
+ if (fileScan.getTableSnapshot().isPresent() && scanNode instanceof
FileQueryScanNode) {
+ ((FileQueryScanNode)
scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
+ }
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode,
table, tupleDescriptor);
}
@@ -671,7 +668,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(),
tupleDescriptor, false,
hudiScan.getScanParams(), hudiScan.getIncrementalRelation());
-
+ if (fileScan.getTableSnapshot().isPresent()) {
+ ((FileQueryScanNode)
scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
+ }
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode,
table, tupleDescriptor);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index be02137cd1a..50b0f5a0269 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -34,6 +34,7 @@ import org.apache.doris.analysis.PredicateUtils;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
@@ -107,6 +108,8 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
// support multi topn filter
protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
+ protected TableSnapshot tableSnapshot;
+
public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType) {
super(id, desc.getId().asList(), planNodeName, statisticalType);
this.desc = desc;
diff --git
a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
new file mode 100644
index 00000000000..38b6ff7846f
--- /dev/null
+++ b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out
@@ -0,0 +1,125 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q00 --
+20240724195843565 20240724195843565_0_0 20240724195843565_0_0
PAR1=para/par2=para
7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet
1 a b para para
+20240724195845718 20240724195845718_0_0 20240724195845718_0_0
PAR1=para/par2=parb
fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet
2 a b para parb
+20240724195848377 20240724195848377_0_1 20240724195848377_0_0
PAR1=para/par2=para
7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet
3 a b para para
+20240724195850799 20240724195850799_0_1 20240724195850799_0_0
PAR1=para/par2=parb
fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet
4 a b para parb
+
+-- !q01 --
+
+-- !q02 --
+
+-- !q01 --
+20240724195843565 20240724195843565_0_0 20240724195843565_0_0
PAR1=para/par2=para
7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet
1 a b para para
+20240724195845718 20240724195845718_0_0 20240724195845718_0_0
PAR1=para/par2=parb
fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet
2 a b para parb
+20240724195848377 20240724195848377_0_1 20240724195848377_0_0
PAR1=para/par2=para
7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet
3 a b para para
+20240724195850799 20240724195850799_0_1 20240724195850799_0_0
PAR1=para/par2=parb
fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet
4 a b para parb
+
+-- !q02 --
+20240724195843565 20240724195843565_0_0 20240724195843565_0_0
PAR1=para/par2=para
7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet
1 a b para para
+20240724195845718 20240724195845718_0_0 20240724195845718_0_0
PAR1=para/par2=parb
fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet
2 a b para parb
+20240724195848377 20240724195848377_0_1 20240724195848377_0_0
PAR1=para/par2=para
7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet
3 a b para para
+20240724195850799 20240724195850799_0_1 20240724195850799_0_0
PAR1=para/par2=parb
fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet
4 a b para parb
+
+-- !q03 --
+
+-- !q04 --
+
+-- !q05 --
+1 a b para para
+
+-- !q06 --
+1 a b para para
+
+-- !q07 --
+1 a b para para
+2 a b para parb
+
+-- !q08 --
+1 a b para para
+2 a b para parb
+
+-- !q09 --
+1 a b para para
+2 a b para parb
+3 a b para para
+
+-- !q10 --
+1 a b para para
+2 a b para parb
+3 a b para para
+
+-- !q11 --
+1 a b para para
+2 a b para parb
+3 a b para para
+4 a b para parb
+
+-- !q12 --
+1 a b para para
+2 a b para parb
+3 a b para para
+4 a b para parb
+
+-- !q50 --
+20240724195853736 20240724195853736_0_0 Id:1 PAR1=para/par2=para
c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet
1 a b para para
+20240724195856338 20240724195856338_0_0 Id:2 PAR1=para/par2=parb
23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet
2 a b para parb
+20240724195858450 20240724195858450_0_1 Id:3 PAR1=para/par2=para
c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet
3 a b para para
+20240724195902682 20240724195902682_0_1 Id:4 PAR1=para/par2=parb
23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet
4 a b para parb
+
+-- !q51 --
+
+-- !q52 --
+
+-- !q51 --
+20240724195853736 20240724195853736_0_0 Id:1 PAR1=para/par2=para
c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet
1 a b para para
+20240724195856338 20240724195856338_0_0 Id:2 PAR1=para/par2=parb
23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet
2 a b para parb
+20240724195858450 20240724195858450_0_1 Id:3 PAR1=para/par2=para
c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet
3 a b para para
+20240724195902682 20240724195902682_0_1 Id:4 PAR1=para/par2=parb
23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet
4 a b para parb
+
+-- !q52 --
+20240724195853736 20240724195853736_0_0 Id:1 PAR1=para/par2=para
c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet
1 a b para para
+20240724195856338 20240724195856338_0_0 Id:2 PAR1=para/par2=parb
23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet
2 a b para parb
+20240724195858450 20240724195858450_0_1 Id:3 PAR1=para/par2=para
c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet
3 a b para para
+20240724195902682 20240724195902682_0_1 Id:4 PAR1=para/par2=parb
23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet
4 a b para parb
+
+-- !q53 --
+
+-- !q54 --
+
+-- !q55 --
+1 a b para para
+
+-- !q56 --
+1 a b para para
+
+-- !q57 --
+1 a b para para
+2 a b para parb
+
+-- !q58 --
+1 a b para para
+2 a b para parb
+
+-- !q59 --
+1 a b para para
+2 a b para parb
+3 a b para para
+
+-- !q60 --
+1 a b para para
+2 a b para parb
+3 a b para para
+
+-- !q61 --
+1 a b para para
+2 a b para parb
+3 a b para para
+4 a b para parb
+
+-- !q62 --
+1 a b para para
+2 a b para parb
+3 a b para para
+4 a b para parb
+
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
new file mode 100644
index 00000000000..e8c85969832
--- /dev/null
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hudi_timetravel",
"p2,external,hudi,external_remote,external_remote_hudi") {
+
+ String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable hudi test")
+ }
+
+ String catalog_name = "test_hudi_timetravel"
+ String props = context.config.otherConfigs.get("hudiEmrCatalog")
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog if not exists ${catalog_name} properties (
+ ${props}
+ );
+ """
+
+ sql """switch ${catalog_name};"""
+ sql """ use regression_hudi;"""
+ sql """ set enable_fallback_to_original_planner=false """
+
+ qt_q00 """select * from timetravel_cow order by id"""
+ qt_q01 """select * from timetravel_cow FOR TIME AS OF "2024-07-24" order
by id""" // no data
+ qt_q02 """select * from timetravel_cow FOR TIME AS OF "20240724" order by
id""" // no data
+ qt_q01 """select * from timetravel_cow FOR TIME AS OF "2024-07-25" order
by id"""
+ qt_q02 """select * from timetravel_cow FOR TIME AS OF "20240725" order by
id"""
+ qt_q03 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "2024-07-24 19:58:43" order by id """ // no data
+ qt_q04 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "20240724195843" order by id """ // no data
+ qt_q05 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "2024-07-24 19:58:44" order by id """ // one
+ qt_q06 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "20240724195844" order by id """ //one
+ qt_q07 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "2024-07-24 19:58:48" order by id """ // two
+ qt_q08 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "20240724195848" order by id """ // two
+ qt_q09 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "2024-07-24 19:58:49" order by id """ // three
+ qt_q10 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "20240724195849" order by id """ // three
+ qt_q11 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "2024-07-24 19:58:51" order by id """ // four
+ qt_q12 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS
OF "20240724195851" order by id """ // four
+
+ qt_q50 """select * from timetravel_mor order by id"""
+ qt_q51 """select * from timetravel_mor FOR TIME AS OF "2024-07-24" order
by id""" // no data
+ qt_q52 """select * from timetravel_mor FOR TIME AS OF "20240724" order by
id""" // no data
+ qt_q51 """select * from timetravel_mor FOR TIME AS OF "2024-07-25" order
by id"""
+ qt_q52 """select * from timetravel_mor FOR TIME AS OF "20240725" order by
id"""
+ qt_q53 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "2024-07-24 19:58:53" order by id """ // no data
+ qt_q54 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "20240724195853" order by id """ // no data
+ qt_q55 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "2024-07-24 19:58:54" order by id """ // one
+ qt_q56 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "20240724195854" order by id """ //one
+ qt_q57 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "2024-07-24 19:58:58" order by id """ // two
+ qt_q58 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "20240724195858" order by id """ // two
+ qt_q59 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "2024-07-24 19:58:59" order by id """ // three
+ qt_q60 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "20240724195859" order by id """ // three
+ qt_q61 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "2024-07-24 19:59:03" order by id """ // four
+ qt_q62 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS
OF "20240724195903" order by id """ // four
+}
+
+
+/*
+
+create table timetravel_cow (
+ Id int,
+ VAL1 string,
+ val2 string,
+ PAR1 string,
+ par2 string
+) using hudi
+partitioned by (par1, par2)
+TBLPROPERTIES (
+ 'type' = 'cow');
+
+create table timetravel_mor (
+ Id int,
+ VAL1 string,
+ val2 string,
+ PAR1 string,
+ par2 string
+) using hudi
+partitioned by (par1, par2)
+TBLPROPERTIES (
+ 'primaryKey' = 'Id',
+ 'type' = 'mor');
+
+insert into timetravel_cow values (1, 'a','b','para','para');
+insert into timetravel_cow values (2, 'a','b','para','parb');
+insert into timetravel_cow values (3, 'a','b','para','para');
+insert into timetravel_cow values (4, 'a','b','para','parb');
+
+insert into timetravel_mor values (1, 'a','b','para','para');
+insert into timetravel_mor values (2, 'a','b','para','parb');
+insert into timetravel_mor values (3, 'a','b','para','para');
+insert into timetravel_mor values (4, 'a','b','para','parb');
+
+*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]