vinothchandar commented on code in PR #5269:
URL: https://github.com/apache/hudi/pull/5269#discussion_r846787619


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java:
##########
@@ -31,16 +33,30 @@
 
   private static final long serialVersionUID = 1L;
 
+  protected final transient Object writeClientUpdateLock = new Object();
+  protected final transient List<BaseHoodieWriteClient<T, I, K, O>> 
oldCompactionClientList = new ArrayList<>();
+
   protected transient BaseHoodieWriteClient<T, I, K, O> compactionClient;
 
+  protected boolean isCompactionRunning = false;
+
   public BaseCompactor(BaseHoodieWriteClient<T, I, K, O> compactionClient) {
     this.compactionClient = compactionClient;
   }
 
   public abstract void compact(HoodieInstant instant) throws IOException;
 
   public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) 
{
-    this.compactionClient = writeClient;
+    synchronized (writeClientUpdateLock) {
+      if (!isCompactionRunning) {
+        this.compactionClient.close();
+      } else {
+        // Store the old compaction client so that they can be closed

Review Comment:
   can't we just tie the life cycle of the write client to the life cycle of a 
single execution of clustering or compaction. i.e from DeltaSync level, when we 
submit the runnable for a executing a given compaction or clustering, we make a 
new write client. is that not sufficient? simpler?
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java:
##########
@@ -31,16 +33,30 @@
 
   private static final long serialVersionUID = 1L;
 
+  protected final transient Object writeClientUpdateLock = new Object();
+  protected final transient List<BaseHoodieWriteClient<T, I, K, O>> 
oldCompactionClientList = new ArrayList<>();
+
   protected transient BaseHoodieWriteClient<T, I, K, O> compactionClient;
 
+  protected boolean isCompactionRunning = false;
+
   public BaseCompactor(BaseHoodieWriteClient<T, I, K, O> compactionClient) {
     this.compactionClient = compactionClient;
   }
 
   public abstract void compact(HoodieInstant instant) throws IOException;
 
   public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) 
{
-    this.compactionClient = writeClient;
+    synchronized (writeClientUpdateLock) {

Review Comment:
   reuse code?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java:
##########
@@ -60,5 +64,12 @@ public void compact(HoodieInstant instant) {
     }
     // Commit compaction
     writeClient.commitCompaction(instant.getTimestamp(), 
compactionMetadata.getCommitMetadata().get(), Option.empty());
+
+    synchronized (writeClientUpdateLock) {

Review Comment:
   Not a fan of all this concurrency handling here. IMO  harder to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to