vinothchandar commented on code in PR #5269: URL: https://github.com/apache/hudi/pull/5269#discussion_r846787619
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java: ########## @@ -31,16 +33,30 @@ private static final long serialVersionUID = 1L; + protected final transient Object writeClientUpdateLock = new Object(); + protected final transient List<BaseHoodieWriteClient<T, I, K, O>> oldCompactionClientList = new ArrayList<>(); + protected transient BaseHoodieWriteClient<T, I, K, O> compactionClient; + protected boolean isCompactionRunning = false; + public BaseCompactor(BaseHoodieWriteClient<T, I, K, O> compactionClient) { this.compactionClient = compactionClient; } public abstract void compact(HoodieInstant instant) throws IOException; public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) { - this.compactionClient = writeClient; + synchronized (writeClientUpdateLock) { + if (!isCompactionRunning) { + this.compactionClient.close(); + } else { + // Store the old compaction client so that they can be closed Review Comment: can't we just tie the life cycle of the write client to the life cycle of a single execution of clustering or compaction. i.e from DeltaSync level, when we submit the runnable for a executing a given compaction or clustering, we make a new write client. is that not sufficient? simpler? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java: ########## @@ -31,16 +33,30 @@ private static final long serialVersionUID = 1L; + protected final transient Object writeClientUpdateLock = new Object(); + protected final transient List<BaseHoodieWriteClient<T, I, K, O>> oldCompactionClientList = new ArrayList<>(); + protected transient BaseHoodieWriteClient<T, I, K, O> compactionClient; + protected boolean isCompactionRunning = false; + public BaseCompactor(BaseHoodieWriteClient<T, I, K, O> compactionClient) { this.compactionClient = compactionClient; } public abstract void compact(HoodieInstant instant) throws IOException; public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) { - this.compactionClient = writeClient; + synchronized (writeClientUpdateLock) { Review Comment: reuse code? ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java: ########## @@ -60,5 +64,12 @@ public void compact(HoodieInstant instant) { } // Commit compaction writeClient.commitCompaction(instant.getTimestamp(), compactionMetadata.getCommitMetadata().get(), Option.empty()); + + synchronized (writeClientUpdateLock) { Review Comment: Not a fan of all this concurrency handling here. IMO harder to read -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org