YuweiXiao commented on code in PR #4958:
URL: https://github.com/apache/hudi/pull/4958#discussion_r917394381
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -67,6 +77,61 @@ public HoodieData<WriteStatus>
updateLocation(HoodieData<WriteStatus> writeStatu
HoodieEngineContext context,
HoodieTable hoodieTable)
throws HoodieIndexException {
+ throw new HoodieIndexException("Consistent hashing index does not support
update location without the instant parameter");
+ }
+
+ /**
+ * Persist hashing metadata to storage. Only clustering operations will
modify the metadata.
+ * For example, splitting & merging buckets, or just sorting and producing a
new bucket.
+ */
+ @Override
+ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses,
+ HoodieEngineContext context,
+ HoodieTable hoodieTable,
+ String instantTime)
+ throws HoodieIndexException {
+ HoodieInstant instant =
hoodieTable.getMetaClient().getActiveTimeline().findInstantsAfterOrEquals(instantTime,
1).firstInstant().get();
+ ValidationUtils.checkState(instant.getTimestamp().equals(instantTime),
"Cannot get the same instant, instantTime: " + instantTime);
+ if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ return writeStatuses;
+ }
+
+ // Double-check if it is a clustering operation by trying to obtain the
clustering plan
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
+ ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(),
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime));
+ if (!instantPlanPair.isPresent()) {
+ return writeStatuses;
+ }
+
+ HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+
HoodieJavaRDD.getJavaRDD(context.parallelize(plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList())))
+ .mapToPair(m -> new
Tuple2<>(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
m)
+ ).groupByKey().foreach((input) -> {
+ // Process each partition
+ String partition = input._1();
+ List<ConsistentHashingNode> childNodes = new ArrayList<>();
+ int seqNo = 0;
Review Comment:
No. Actually only clustering operation will go through the above code path.
And we constrain at most one clustering plan to be generated for each
partition, so there will not be multiple writers (clustering executors) modify
the same partition's hashing metadata.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]