zentol commented on a change in pull request #13020:
URL: https://github.com/apache/flink/pull/13020#discussion_r462404713



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
##########
@@ -114,6 +115,21 @@ protected void respondAsLeader(ChannelHandlerContext ctx, 
RoutedRequest routedRe
                        log.trace("Received request " + httpRequest.uri() + 
'.');
                }
 
+               synchronized (this) {

Review comment:
       ```suggestion
                synchronized (lock) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
##########
@@ -114,6 +115,21 @@ protected void respondAsLeader(ChannelHandlerContext ctx, 
RoutedRequest routedRe
                        log.trace("Received request " + httpRequest.uri() + 
'.');
                }
 
+               synchronized (this) {
+                       if (terminationFuture != null) {
+                               String errorMsg = "The handler instance for " + 
untypedResponseMessageHeaders.getTargetRestEndpointURL()
+                                       + " had already been closed";
+                               log.warn(errorMsg);
+                               HandlerUtils.sendErrorResponse(
+                                       ctx,
+                                       httpRequest,
+                                       new ErrorResponseBody(errorMsg),
+                                       HttpResponseStatus.BAD_REQUEST,
+                                       responseHeaders);
+                               return;
+                       }
+               }
+
                FileUploads uploadedFiles = null;
                try {
                        inFlightRequestTracker.registerRequest();

Review comment:
       this also needs to happen under the lock, otherwise the race condition 
isn't fixed. (just move the entire synchronized block here)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
##########
@@ -114,6 +115,21 @@ protected void respondAsLeader(ChannelHandlerContext ctx, 
RoutedRequest routedRe
                        log.trace("Received request " + httpRequest.uri() + 
'.');
                }
 
+               synchronized (this) {
+                       if (terminationFuture != null) {
+                               String errorMsg = "The handler instance for " + 
untypedResponseMessageHeaders.getTargetRestEndpointURL()
+                                       + " had already been closed";
+                               log.warn(errorMsg);
+                               HandlerUtils.sendErrorResponse(

Review comment:
       We should not try to submit a response; since we are either already 
shutdown or in the process of shutting down the response may or may not go 
through. Better to be consistent and not respond in either case.
   One thing we can do though is close the channel via `ctx.channel().close()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to