dawidwys commented on a change in pull request #18745:
URL: https://github.com/apache/flink/pull/18745#discussion_r807684936



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to

Review comment:
       I'd expect it to behave the other way around. If something is 
interruptable, I'd expect it can be properly interrupted. The way II understand 
it, now if the flag is true (interruptable), you can not interrupt this method.
   
   EDIT: Is it just that the javadoc is wrong?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -82,7 +84,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SourceCoordinatorContext.class);
 
-    private final ExecutorService coordinatorExecutor;
+    private final ScheduledExecutorService coordinatorExecutor;

Review comment:
       nit: Could we create the `coordinatorExecutor` in the ctor? It's a bit 
weird now that we close a `coordinatorExecutor` that we are given from the 
outside of the class. As far as I checked it's always a new service created in 
the `SourceCoordinatorProvider`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} 
call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(
+            ExecutorService executor, Duration timeout, boolean interruptable) 
{
+        long startingTime = clock.relativeTimeMillis();
+        long timeElapsed = 0L;
+        boolean isInterrupted = false;
+        do {
+            executor.shutdownNow();
+            try {
+                long timeRemaining = timeout.toMillis() - timeElapsed;
+                executor.awaitTermination(timeRemaining, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                isInterrupted = interruptable;
+            }
+            timeElapsed = clock.relativeTimeMillis() - startingTime;
+        } while (!executor.isTerminated() && !isInterrupted && timeElapsed < 
timeout.toMillis());
+        return executor.isTerminated();
+    }
+
     static void abortThread(Thread t) {
-        // the abortion strategy is pretty simple here...
-        t.interrupt();
+        // Try our best here to ensure the thread is aborted. Keep 
interrupting the
+        // thread for 10 times with 10 ms intervals. This helps handle the case
+        // where the shutdown sequence consists of a bunch of closeQuietly() 
calls
+        // that will swallow the InterruptedException so the thread to be 
aborted
+        // may block multiple times. If the thread is still alive after all the
+        // attempts, just let it go. The caller of closeAsyncWithTimeout() 
should
+        // have received a TimeoutException in this case.
+        int i = 0;
+        while (t.isAlive() && i < 10) {
+            t.interrupt();
+            i++;
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                // Let it go.
+            }
+        }
+    }
+
+    // ========= Method visible for testing ========
+
+    @VisibleForTesting
+    static void setClock(Clock clock) {

Review comment:
       Can we have it as a parameter in the methods that use it instead? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
##########
@@ -153,7 +156,6 @@ public void close() throws InterruptedException {
             return;
         }
         // Shutdown the worker executor, so no more worker tasks can run.
-        workerExecutor.shutdownNow();
-        workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        shutdownExecutorForcefully(workerExecutor, 
Duration.ofMillis(Long.MAX_VALUE));

Review comment:
       nit: I find it equally weird, similarly as in 
`SourceCoordinatorContext`, that we close something, given from the outside.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} 
call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(
+            ExecutorService executor, Duration timeout, boolean interruptable) 
{
+        long startingTime = clock.relativeTimeMillis();
+        long timeElapsed = 0L;
+        boolean isInterrupted = false;
+        do {
+            executor.shutdownNow();
+            try {
+                long timeRemaining = timeout.toMillis() - timeElapsed;
+                executor.awaitTermination(timeRemaining, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                isInterrupted = interruptable;
+            }
+            timeElapsed = clock.relativeTimeMillis() - startingTime;
+        } while (!executor.isTerminated() && !isInterrupted && timeElapsed < 
timeout.toMillis());
+        return executor.isTerminated();
+    }
+
     static void abortThread(Thread t) {

Review comment:
       Why is it not private?

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
##########
@@ -113,7 +113,7 @@ public boolean isTerminated() {
     }
 
     @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) {
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {

Review comment:
       That's an unnecessary change, isn't it?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -217,18 +207,11 @@ public void start() throws Exception {
     @Override
     public void close() throws Exception {
         LOG.info("Closing SourceCoordinator for source {}.", operatorName);
-        try {
-            if (started) {
-                context.close();
-                if (enumerator != null) {
-                    enumerator.close();
-                }
+        if (started) {
+            closeQuietly(context);

Review comment:
       Could we use `IOUtils.closeAll` instead? That way we would not loose 
exception from `context.close()`.
   
   ```
   IOUtils.closeAll(context, enumerator);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -259,9 +265,9 @@ public void runInCoordinatorThread(Runnable runnable) {
     @Override
     public void close() throws InterruptedException {
         closed = true;
-        notifier.close();
-        coordinatorExecutor.shutdown();
-        coordinatorExecutor.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+        // Close quietly so the closing sequence will be executed completely.
+        closeQuietly(notifier);

Review comment:
       Can we use `IOUtils.closeAll` here as well?
   
   ```
           IOUtils.closeAll(
                   notifier,
                   () ->
                           shutdownExecutorForcefully(
                                   coordinatorExecutor, 
Duration.ofMillis(Long.MAX_VALUE)));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} 
call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(

Review comment:
       Could we rewrite it using `Deadline`?
   
   ```
       public static boolean shutdownExecutorForcefully(
               ExecutorService executor, Duration timeout, boolean 
interruptable) {
           boolean isInterrupted = false;
           final Deadline deadline = Deadline.fromNowWithClock(timeout, clock);
           do {
               executor.shutdownNow();
               try {
                   executor.awaitTermination(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
               } catch (InterruptedException e) {
                   isInterrupted = interruptable;
               }
           } while (!executor.isTerminated() && !isInterrupted && 
deadline.hasTimeLeft());
           return executor.isTerminated();
       }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} 
call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(
+            ExecutorService executor, Duration timeout, boolean interruptable) 
{
+        long startingTime = clock.relativeTimeMillis();
+        long timeElapsed = 0L;
+        boolean isInterrupted = false;
+        do {
+            executor.shutdownNow();
+            try {
+                long timeRemaining = timeout.toMillis() - timeElapsed;
+                executor.awaitTermination(timeRemaining, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                isInterrupted = interruptable;
+            }
+            timeElapsed = clock.relativeTimeMillis() - startingTime;
+        } while (!executor.isTerminated() && !isInterrupted && timeElapsed < 
timeout.toMillis());

Review comment:
       nit: How about we change the order? I guess `executor.isTerminated()` is 
the most expensive operation and we do not need to execute it here if other 
conditions are not met.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to