dawidwys commented on a change in pull request #18745: URL: https://github.com/apache/flink/pull/18745#discussion_r807684936
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java ########## @@ -95,8 +100,94 @@ private ComponentClosingUtils() {} return future; } + /** + * A util method that tries to shut down an {@link ExecutorService} elegantly within the given + * timeout. If the executor has not been shut down before it hits timeout or the thread is + * interrupted when waiting for the termination, a forceful shutdown will be attempted on the + * executor. + * + * @param executor the {@link ExecutorService} to shut down. + * @param timeout the timeout duration. + * @return true if the given executor has been successfully closed, false otherwise. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + public static boolean tryShutdownExecutorElegantly(ExecutorService executor, Duration timeout) { + try { + executor.shutdown(); + executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + // Let it go. + } + if (!executor.isTerminated()) { + shutdownExecutorForcefully(executor, Duration.ZERO, false); + } + return executor.isTerminated(); + } + + /** + * Shutdown the given executor forcefully within the given timeout. The method returns if it is + * interrupted. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully(ExecutorService executor, Duration timeout) { + return shutdownExecutorForcefully(executor, timeout, true); + } + + /** + * Shutdown the given executor forcefully within the given timeout. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @param interruptable when set to true, the method cannot be interrupted. Each interruption to Review comment: I'd expect it to behave the other way around. If something is interruptable, I'd expect it can be properly interrupted. The way II understand it, now if the flag is true (interruptable), you can not interrupt this method. EDIT: Is it just that the javadoc is wrong? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java ########## @@ -82,7 +84,7 @@ Licensed to the Apache Software Foundation (ASF) under one private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class); - private final ExecutorService coordinatorExecutor; + private final ScheduledExecutorService coordinatorExecutor; Review comment: nit: Could we create the `coordinatorExecutor` in the ctor? It's a bit weird now that we close a `coordinatorExecutor` that we are given from the outside of the class. As far as I checked it's always a new service created in the `SourceCoordinatorProvider`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java ########## @@ -95,8 +100,94 @@ private ComponentClosingUtils() {} return future; } + /** + * A util method that tries to shut down an {@link ExecutorService} elegantly within the given + * timeout. If the executor has not been shut down before it hits timeout or the thread is + * interrupted when waiting for the termination, a forceful shutdown will be attempted on the + * executor. + * + * @param executor the {@link ExecutorService} to shut down. + * @param timeout the timeout duration. + * @return true if the given executor has been successfully closed, false otherwise. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + public static boolean tryShutdownExecutorElegantly(ExecutorService executor, Duration timeout) { + try { + executor.shutdown(); + executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + // Let it go. + } + if (!executor.isTerminated()) { + shutdownExecutorForcefully(executor, Duration.ZERO, false); + } + return executor.isTerminated(); + } + + /** + * Shutdown the given executor forcefully within the given timeout. The method returns if it is + * interrupted. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully(ExecutorService executor, Duration timeout) { + return shutdownExecutorForcefully(executor, timeout, true); + } + + /** + * Shutdown the given executor forcefully within the given timeout. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @param interruptable when set to true, the method cannot be interrupted. Each interruption to + * the thread results in another {@code ExecutorService.shutdownNow()} call to the shutting + * down executor. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully( + ExecutorService executor, Duration timeout, boolean interruptable) { + long startingTime = clock.relativeTimeMillis(); + long timeElapsed = 0L; + boolean isInterrupted = false; + do { + executor.shutdownNow(); + try { + long timeRemaining = timeout.toMillis() - timeElapsed; + executor.awaitTermination(timeRemaining, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + isInterrupted = interruptable; + } + timeElapsed = clock.relativeTimeMillis() - startingTime; + } while (!executor.isTerminated() && !isInterrupted && timeElapsed < timeout.toMillis()); + return executor.isTerminated(); + } + static void abortThread(Thread t) { - // the abortion strategy is pretty simple here... - t.interrupt(); + // Try our best here to ensure the thread is aborted. Keep interrupting the + // thread for 10 times with 10 ms intervals. This helps handle the case + // where the shutdown sequence consists of a bunch of closeQuietly() calls + // that will swallow the InterruptedException so the thread to be aborted + // may block multiple times. If the thread is still alive after all the + // attempts, just let it go. The caller of closeAsyncWithTimeout() should + // have received a TimeoutException in this case. + int i = 0; + while (t.isAlive() && i < 10) { + t.interrupt(); + i++; + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // Let it go. + } + } + } + + // ========= Method visible for testing ======== + + @VisibleForTesting + static void setClock(Clock clock) { Review comment: Can we have it as a parameter in the methods that use it instead? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java ########## @@ -153,7 +156,6 @@ public void close() throws InterruptedException { return; } // Shutdown the worker executor, so no more worker tasks can run. - workerExecutor.shutdownNow(); - workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + shutdownExecutorForcefully(workerExecutor, Duration.ofMillis(Long.MAX_VALUE)); Review comment: nit: I find it equally weird, similarly as in `SourceCoordinatorContext`, that we close something, given from the outside. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java ########## @@ -95,8 +100,94 @@ private ComponentClosingUtils() {} return future; } + /** + * A util method that tries to shut down an {@link ExecutorService} elegantly within the given + * timeout. If the executor has not been shut down before it hits timeout or the thread is + * interrupted when waiting for the termination, a forceful shutdown will be attempted on the + * executor. + * + * @param executor the {@link ExecutorService} to shut down. + * @param timeout the timeout duration. + * @return true if the given executor has been successfully closed, false otherwise. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + public static boolean tryShutdownExecutorElegantly(ExecutorService executor, Duration timeout) { + try { + executor.shutdown(); + executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + // Let it go. + } + if (!executor.isTerminated()) { + shutdownExecutorForcefully(executor, Duration.ZERO, false); + } + return executor.isTerminated(); + } + + /** + * Shutdown the given executor forcefully within the given timeout. The method returns if it is + * interrupted. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully(ExecutorService executor, Duration timeout) { + return shutdownExecutorForcefully(executor, timeout, true); + } + + /** + * Shutdown the given executor forcefully within the given timeout. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @param interruptable when set to true, the method cannot be interrupted. Each interruption to + * the thread results in another {@code ExecutorService.shutdownNow()} call to the shutting + * down executor. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully( + ExecutorService executor, Duration timeout, boolean interruptable) { + long startingTime = clock.relativeTimeMillis(); + long timeElapsed = 0L; + boolean isInterrupted = false; + do { + executor.shutdownNow(); + try { + long timeRemaining = timeout.toMillis() - timeElapsed; + executor.awaitTermination(timeRemaining, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + isInterrupted = interruptable; + } + timeElapsed = clock.relativeTimeMillis() - startingTime; + } while (!executor.isTerminated() && !isInterrupted && timeElapsed < timeout.toMillis()); + return executor.isTerminated(); + } + static void abortThread(Thread t) { Review comment: Why is it not private? ########## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java ########## @@ -113,7 +113,7 @@ public boolean isTerminated() { } @Override - public boolean awaitTermination(long timeout, TimeUnit unit) { + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { Review comment: That's an unnecessary change, isn't it? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java ########## @@ -217,18 +207,11 @@ public void start() throws Exception { @Override public void close() throws Exception { LOG.info("Closing SourceCoordinator for source {}.", operatorName); - try { - if (started) { - context.close(); - if (enumerator != null) { - enumerator.close(); - } + if (started) { + closeQuietly(context); Review comment: Could we use `IOUtils.closeAll` instead? That way we would not loose exception from `context.close()`. ``` IOUtils.closeAll(context, enumerator); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java ########## @@ -259,9 +265,9 @@ public void runInCoordinatorThread(Runnable runnable) { @Override public void close() throws InterruptedException { closed = true; - notifier.close(); - coordinatorExecutor.shutdown(); - coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + // Close quietly so the closing sequence will be executed completely. + closeQuietly(notifier); Review comment: Can we use `IOUtils.closeAll` here as well? ``` IOUtils.closeAll( notifier, () -> shutdownExecutorForcefully( coordinatorExecutor, Duration.ofMillis(Long.MAX_VALUE))); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java ########## @@ -95,8 +100,94 @@ private ComponentClosingUtils() {} return future; } + /** + * A util method that tries to shut down an {@link ExecutorService} elegantly within the given + * timeout. If the executor has not been shut down before it hits timeout or the thread is + * interrupted when waiting for the termination, a forceful shutdown will be attempted on the + * executor. + * + * @param executor the {@link ExecutorService} to shut down. + * @param timeout the timeout duration. + * @return true if the given executor has been successfully closed, false otherwise. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + public static boolean tryShutdownExecutorElegantly(ExecutorService executor, Duration timeout) { + try { + executor.shutdown(); + executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + // Let it go. + } + if (!executor.isTerminated()) { + shutdownExecutorForcefully(executor, Duration.ZERO, false); + } + return executor.isTerminated(); + } + + /** + * Shutdown the given executor forcefully within the given timeout. The method returns if it is + * interrupted. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully(ExecutorService executor, Duration timeout) { + return shutdownExecutorForcefully(executor, timeout, true); + } + + /** + * Shutdown the given executor forcefully within the given timeout. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @param interruptable when set to true, the method cannot be interrupted. Each interruption to + * the thread results in another {@code ExecutorService.shutdownNow()} call to the shutting + * down executor. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully( Review comment: Could we rewrite it using `Deadline`? ``` public static boolean shutdownExecutorForcefully( ExecutorService executor, Duration timeout, boolean interruptable) { boolean isInterrupted = false; final Deadline deadline = Deadline.fromNowWithClock(timeout, clock); do { executor.shutdownNow(); try { executor.awaitTermination(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { isInterrupted = interruptable; } } while (!executor.isTerminated() && !isInterrupted && deadline.hasTimeLeft()); return executor.isTerminated(); } ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java ########## @@ -95,8 +100,94 @@ private ComponentClosingUtils() {} return future; } + /** + * A util method that tries to shut down an {@link ExecutorService} elegantly within the given + * timeout. If the executor has not been shut down before it hits timeout or the thread is + * interrupted when waiting for the termination, a forceful shutdown will be attempted on the + * executor. + * + * @param executor the {@link ExecutorService} to shut down. + * @param timeout the timeout duration. + * @return true if the given executor has been successfully closed, false otherwise. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + public static boolean tryShutdownExecutorElegantly(ExecutorService executor, Duration timeout) { + try { + executor.shutdown(); + executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + // Let it go. + } + if (!executor.isTerminated()) { + shutdownExecutorForcefully(executor, Duration.ZERO, false); + } + return executor.isTerminated(); + } + + /** + * Shutdown the given executor forcefully within the given timeout. The method returns if it is + * interrupted. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully(ExecutorService executor, Duration timeout) { + return shutdownExecutorForcefully(executor, timeout, true); + } + + /** + * Shutdown the given executor forcefully within the given timeout. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @param interruptable when set to true, the method cannot be interrupted. Each interruption to + * the thread results in another {@code ExecutorService.shutdownNow()} call to the shutting + * down executor. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully( + ExecutorService executor, Duration timeout, boolean interruptable) { + long startingTime = clock.relativeTimeMillis(); + long timeElapsed = 0L; + boolean isInterrupted = false; + do { + executor.shutdownNow(); + try { + long timeRemaining = timeout.toMillis() - timeElapsed; + executor.awaitTermination(timeRemaining, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + isInterrupted = interruptable; + } + timeElapsed = clock.relativeTimeMillis() - startingTime; + } while (!executor.isTerminated() && !isInterrupted && timeElapsed < timeout.toMillis()); Review comment: nit: How about we change the order? I guess `executor.isTerminated()` is the most expensive operation and we do not need to execute it here if other conditions are not met. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org