patricklucas commented on code in PR #22987:
URL: https://github.com/apache/flink/pull/22987#discussion_r1263527019


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -501,6 +501,22 @@ private <P extends ResponseBody> CompletableFuture<P> 
submitRequest(
                     }
                 });
 
+        // [FLINK-32583] If connectFuture failed instantly but channelFuture 
is unresolved, it may
+        // mean the executor service Netty is using has shut down, in which 
case the above listener
+        // to complete channelFuture will never run
+        if (connectFuture.isDone() && !connectFuture.isSuccess() && 
!channelFuture.isDone()) {

Review Comment:
   While trying to get a better picture of the various cases we might need to 
handle to get full coverage here I took a step back and tried to get a broader 
view of this.
   
   I verified that the event loop/executor for both the connection as well as 
its listeners is the same, the NioEventLoopGroup `bootstrap` is configured with 
elsewhere in RestClient, and I saw that when the client is shut down, this 
group is asked to shut down gracefully with a "quiet period" of 0 and a default 
timeout of 10 seconds. As far as I can tell, already-scheduled tasks are 
allowed to run to completion.
   
   I believe we could cover the vast majority of cases by simply checking 
`isRunning` at the top of `submitRequest`—if the client has already been closed 
and graceful shutdown of the group initiated, we should immediately reject any 
new requests.
   
   The challenge in particular is covering the case where the client is shut 
down after the call to `bootstrap.connect()` but before the listeners are 
notified. Depending on configured timeouts, the connect task may run to 
completion or time out, but not be able to notify its listeners and thus 
resolve `channelFuture`. I don't think we have any way to detect this within 
`submitRequest` itself.
   
   The only solution that comes to mind here would be to:
   - Add each `channelFuture` created in `submitRequest` to a class-level 
collection
   - In the listener attached to `connectFuture`, remove the `channelFuture` 
from that collection after calling `complete`/`completeExceptionally`
   - In the listener in `shutdownInternally` that is notified when the event 
loop shuts down, resolve all remaining futures in the collection exceptionally
   
   Having written it down it doesn't sound as complex as I first thought, but 
it's still clearly beyond what I had originally hoped for this change.
   
   @XComp Thoughts? Should I go with simply checking whether the client has 
already been closed at the top of `submitRequest`, implement this "future 
cleanup" logic, or both?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -501,6 +501,22 @@ private <P extends ResponseBody> CompletableFuture<P> 
submitRequest(
                     }
                 });
 
+        // [FLINK-32583] If connectFuture failed instantly but channelFuture 
is unresolved, it may
+        // mean the executor service Netty is using has shut down, in which 
case the above listener
+        // to complete channelFuture will never run
+        if (connectFuture.isDone() && !connectFuture.isSuccess() && 
!channelFuture.isDone()) {

Review Comment:
   While trying to get a better picture of the various cases we might need to 
handle to get full coverage here I took a step back and tried to get a broader 
view of this.
   
   I verified that the event loop/executor for both the connection as well as 
its listeners is the same, the NioEventLoopGroup `bootstrap` is configured with 
elsewhere in RestClient, and I saw that when the client is shut down, this 
group is asked to shut down gracefully with a "quiet period" of 0 and a default 
timeout of 10 seconds. As far as I can tell, already-scheduled tasks are 
allowed to run to completion.
   
   I believe we could cover the vast majority of cases by simply checking 
`isRunning` at the top of `submitRequest`—if the client has already been closed 
and graceful shutdown of the group initiated, we should immediately reject any 
new requests.
   
   The challenge in particular is covering the case where the client is shut 
down after the call to `bootstrap.connect()` but before the listeners are 
notified. Depending on configured timeouts, the connect task may run to 
completion or time out, but not be able to notify its listeners and thus 
resolve `channelFuture`. I don't think we have any way to detect this within 
`submitRequest` itself.
   
   The only solution that comes to mind here would be to:
   - Add each `channelFuture` created in `submitRequest` to a class-level 
collection
   - In the listener attached to `connectFuture`, remove the `channelFuture` 
from that collection after calling `complete`/`completeExceptionally`
   - In the listener in `shutdownInternally` that is notified when the event 
loop shuts down, resolve all remaining futures in the collection exceptionally
   
   Having written it down it doesn't sound as complex as I first thought, but 
it's still clearly beyond what I had originally hoped for this change.
   
   @XComp Thoughts? Should I go with simply checking whether the client has 
already been closed at the top of `submitRequest`, implement this "future 
cleanup" logic, or both?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to