This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d91bb0e7eb94a416aae1f9d236147c776354d8d0 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed Dec 29 21:45:09 2021 -0500 Revert "[HUDI-3043] Revert async cleaner leak commit to unblock CI failure (#4343)" (#4465) This reverts commit 7e7ad1558c0dcc06e059f631e43e44dc04100aa4. --- .../org/apache/hudi/async/HoodieAsyncService.java | 36 +++++----------------- .../hudi/client/AbstractHoodieWriteClient.java | 6 +++- .../apache/hudi/client/AsyncCleanerService.java | 14 ++++----- .../apache/hudi/client/HoodieFlinkWriteClient.java | 6 +++- .../java/org/apache/hudi/sink/CleanFunction.java | 7 +++++ 5 files changed, 31 insertions(+), 38 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index 85e0081..f57484d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -29,7 +29,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -130,7 +129,7 @@ public abstract class HoodieAsyncService implements Serializable { future = res.getKey(); executor = res.getValue(); started = true; - monitorThreads(onShutdownCallback); + shutdownCallback(onShutdownCallback); } /** @@ -141,34 +140,15 @@ public abstract class HoodieAsyncService implements Serializable { protected abstract Pair<CompletableFuture, ExecutorService> startService(); /** - * A monitor thread is started which would trigger a callback if the service is shutdown. + * Add shutdown callback for the completable future. * - * @param onShutdownCallback + * @param callback The callback */ - private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) { - LOG.info("Submitting monitor thread !!"); - Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "Monitor Thread"); - t.setDaemon(isRunInDaemonMode()); - return t; - }).submit(() -> { - boolean error = false; - try { - LOG.info("Monitoring thread(s) !!"); - future.get(); - } catch (ExecutionException ex) { - LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex); - error = true; - } catch (InterruptedException ie) { - LOG.error("Got interrupted Monitoring threads", ie); - error = true; - } finally { - // Mark as shutdown - shutdown = true; - if (null != onShutdownCallback) { - onShutdownCallback.apply(error); - } - shutdown(false); + @SuppressWarnings("unchecked") + private void shutdownCallback(Function<Boolean, Boolean> callback) { + future.whenComplete((resp, error) -> { + if (null != callback) { + callback.apply(null != error); } }); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 396023a..76b10fd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -425,7 +425,11 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I HoodieTableMetaClient metaClient) { setOperationType(writeOperationType); this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + if (null == this.asyncCleanerService) { + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + } else { + this.asyncCleanerService.start(null); + } } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java index 2fd4251..a5a38f2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java @@ -37,28 +37,26 @@ class AsyncCleanerService extends HoodieAsyncService { private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); private final AbstractHoodieWriteClient writeClient; - private final String cleanInstantTime; private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); - protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) { + protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) { this.writeClient = writeClient; - this.cleanInstantTime = cleanInstantTime; } @Override protected Pair<CompletableFuture, ExecutorService> startService() { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); return Pair.of(CompletableFuture.supplyAsync(() -> { - writeClient.clean(cleanInstantTime); + writeClient.clean(instantTime); return true; - }), executor); + }, executor), executor); } public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) { AsyncCleanerService asyncCleanerService = null; if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { - String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); - asyncCleanerService = new AsyncCleanerService(writeClient, instantTime); + asyncCleanerService = new AsyncCleanerService(writeClient); asyncCleanerService.start(null); } else { LOG.info("Async auto cleaning is not enabled. Not running cleaner now"); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 2ed2536..4108ba4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -281,7 +281,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends * checkpoint finish. */ public void startAsyncCleaning() { - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + if (this.asyncCleanerService == null) { + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + } else { + this.asyncCleanerService.start(null); + } } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 13154b2..195e430 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -98,4 +98,11 @@ public class CleanFunction<T> extends AbstractRichFunction public void initializeState(FunctionInitializationContext context) throws Exception { // no operation } + + @Override + public void close() throws Exception { + if (this.writeClient != null) { + this.writeClient.close(); + } + } }
