zhangyue19921010 commented on code in PR #13060:
URL: https://github.com/apache/hudi/pull/13060#discussion_r2028869779
##########
hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java:
##########
@@ -196,24 +196,51 @@ public static Option<PartitionBucketIndexHashingConfig>
loadHashingConfig(Hoodie
/**
* Get Latest committed hashing config instant to load.
+ * If instant is empty, then return latest hashing config instant
*/
- public static String
getLatestHashingConfigInstantToLoad(HoodieTableMetaClient metaClient) {
+ public static Option<String>
getHashingConfigInstantToLoad(HoodieTableMetaClient metaClient, Option<String>
instant) {
try {
List<String> allCommittedHashingConfig =
getCommittedHashingConfigInstants(metaClient);
- return allCommittedHashingConfig.get(allCommittedHashingConfig.size() -
1);
+ if (instant.isPresent()) {
+ Option<String> res =
getHashingConfigInstantToLoadBeforeOrOn(allCommittedHashingConfig,
instant.get());
+ // fall back to look up archived hashing config instant before return
empty
+ return res.isPresent() ? res :
getHashingConfigInstantToLoadBeforeOrOn(getArchiveHashingConfigInstants(metaClient),
instant.get());
+ } else {
+ return
Option.of(allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1));
+ }
} catch (Exception e) {
throw new HoodieException("Failed to get hashing config instant to
load.", e);
}
}
+ private static Option<String>
getHashingConfigInstantToLoadBeforeOrOn(List<String> hashingConfigInstants,
String instant) {
Review Comment:
Yes, Danny. After Bucket Rescale is completed, the data layout will change.
Therefore, for Spark's Time Travel,something like travel to specific time point
snapshot view (not Incremental
Query)(https://hudi.apache.org/docs/sql_queries#time-travel-query),
Such as executing a query like `SELECT * FROM <table_name> TIMESTAMP AS OF
<instant1> WHERE <filter_conditions>`, the Hudi would init
specifiedQueryTimestamp through HoodieBaseRelation.
```
protected lazy val specifiedQueryTimestamp: Option[String] =
optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlCommonUtils.formatQueryInstant)
```
Then get `Schema` and build `fsView` based on `specifiedQueryTimestamp`
For constructing the FsView, Hudi will call
`getLatestMergedFileSlicesBeforeOrOn(String partitionStr, String
maxInstantTime)`, travel fs view to the specified version. At this point, it is
also necessary to load the corresponding hashing_config that was valid at that
specific timestamp to ensure the historical data layout
```
protected def listLatestFileSlices(globPaths: Seq[StoragePath],
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]):
Seq[FileSlice] = {
queryTimestamp match {
case Some(ts) =>
specifiedQueryTimestamp.foreach(t =>
validateTimestampAsOf(metaClient, t))
val partitionDirs = if (globPaths.isEmpty) {
fileIndex.listFiles(partitionFilters, dataFilters)
} else {
val inMemoryFileIndex =
HoodieInMemoryFileIndex.create(sparkSession, globPaths)
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}
val fsView = new HoodieTableFileSystemView(
metaClient, timeline,
sparkAdapter.getSparkPartitionedFileUtils.toFileStatuses(partitionDirs)
.map(fileStatus =>
HadoopFSUtils.convertToStoragePathInfo(fileStatus))
.asJava)
fsView.getPartitionPaths.asScala.flatMap { partitionPath =>
val relativePath =
getRelativePartitionPath(convertToStoragePath(basePath), partitionPath)
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath,
ts).iterator().asScala
}.toSeq
case _ => Seq()
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]