This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 91253ef05d2af82df2d530c847cd1440956b95e8 Author: yuzhaojing <32435329+yuzhaoj...@users.noreply.github.com> AuthorDate: Fri Dec 31 13:12:32 2021 +0800 [HUDI-3120] Cache compactionPlan in buffer (#4463) Co-authored-by: yuzhaojing <yuzhaoj...@bytedance.com> --- .../hudi/sink/compact/CompactionCommitSink.java | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 0309278..5312735 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; @@ -68,6 +69,12 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { private transient Map<String, Map<String, CompactionCommitEvent>> commitBuffer; /** + * Cache to store compaction plan for each instant. + * Stores the mapping of instant_time -> compactionPlan. + */ + private transient Map<String, HoodieCompactionPlan> compactionPlanCache; + + /** * The hoodie table. */ private transient HoodieFlinkTable<?> table; @@ -84,6 +91,7 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); } this.commitBuffer = new HashMap<>(); + this.compactionPlanCache = new HashMap<>(); this.table = this.writeClient.getHoodieTable(); } @@ -108,8 +116,15 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { * @param events Commit events ever received for the instant */ private void commitIfNecessary(String instant, Collection<CompactionCommitEvent> events) throws IOException { - HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( - this.writeClient.getHoodieTable().getMetaClient(), instant); + HoodieCompactionPlan compactionPlan = compactionPlanCache.computeIfAbsent(instant, k -> { + try { + return CompactionUtils.getCompactionPlan( + this.writeClient.getHoodieTable().getMetaClient(), instant); + } catch (IOException e) { + throw new HoodieException(e); + } + }); + boolean isReady = compactionPlan.getOperations().size() == events.size(); if (!isReady) { return; @@ -143,5 +158,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { private void reset(String instant) { this.commitBuffer.remove(instant); + this.compactionPlanCache.remove(instant); } }