
Till Rohrmann commented on FLINK-18663:

Thanks for providing the logs [~tartarus]. There is some part missing which 
shows under which address the {{Dispatcher}} is being registered. Would it be 
possible to also post this part. The reason I'm asking is because I want to 
understand whether there was a failover happening and whether 
{{Actor[akka://flink/user/dispatcher#-88418157}} is the running {{Dispatcher}}. 
It is a bit odd that all the requests time out with an {{AskTimeoutException}} 
if it is the running {{Dispatcher}}.

It is true that we can get rid of the symptoms of the problem by passing in the 
{{maxContentLength}} but I would also like to understand the underlying 
problem. Have you tried what I proposed to add above? Concretely checking under 
{{lock}} whether {{terminationFuture}} has been set at the beginning of 
{{respondAsLeader}}? If it has been set, then this means that the handler is 
shutting down and we should ignore this request. Moreover, the 
{{inFlightRequestTracker.registerRequest();}} should also happen under {{lock}}.

> AbstractHandler throw NPE cause by FlinkHttpObjectAggregator is null
> when rest throw exception, it will do this code
> {code:java}
> private CompletableFuture<Void> handleException(Throwable throwable, 
> ChannelHandlerContext ctx, HttpRequest httpRequest) {
>       FlinkHttpObjectAggregator flinkHttpObjectAggregator = 
> ctx.pipeline().get(FlinkHttpObjectAggregator.class);
>       int maxLength = flinkHttpObjectAggregator.maxContentLength() - 
>       if (throwable instanceof RestHandlerException) {
>               RestHandlerException rhe = (RestHandlerException) throwable;
>               String stackTrace = ExceptionUtils.stringifyException(rhe);
>               String truncatedStackTrace = Ascii.truncate(stackTrace, 
> maxLength, "...");
>               if (log.isDebugEnabled()) {
>                       log.error("Exception occurred in REST handler.", rhe);
>               } else {
>                       log.error("Exception occurred in REST handler: {}", 
> rhe.getMessage());
>               }
>               return HandlerUtils.sendErrorResponse(
>                       ctx,
>                       httpRequest,
>                       new ErrorResponseBody(truncatedStackTrace),
>                       rhe.getHttpResponseStatus(),
>                       responseHeaders);
>       } else {
>               log.error("Unhandled exception.", throwable);
>               String stackTrace = String.format("<Exception on server 
> side:%n%s%nEnd of exception on server side>",
>                       ExceptionUtils.stringifyException(throwable));
>               String truncatedStackTrace = Ascii.truncate(stackTrace, 
> maxLength, "...");
>               return HandlerUtils.sendErrorResponse(
>                       ctx,
>                       httpRequest,
>                       new ErrorResponseBody(Arrays.asList("Internal server 
> error.", truncatedStackTrace)),
>                       HttpResponseStatus.INTERNAL_SERVER_ERROR,
>                       responseHeaders);
>       }
> }
> {code}
> but flinkHttpObjectAggregator some case is null,so this will throw NPE,but 
> this method called by  AbstractHandler#respondAsLeader
> {code:java}
> requestProcessingFuture
>       .whenComplete((Void ignored, Throwable throwable) -> {
>               if (throwable != null) {
> handleException(ExceptionUtils.stripCompletionException(throwable), ctx, 
> httpRequest)
>                               .whenComplete((Void ignored2, Throwable 
> throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
>               } else {
>                       finalizeRequestProcessing(finalUploadedFiles);
>               }
>       });
> {code}
>  the result is InFlightRequestTracker Cannot be cleared.
> so the CompletableFuture does‘t complete that handler's closeAsync returned
> !C49A7310-F932-451B-A203-6D17F3140C0D.png!
> !e18e00dd6664485c2ff55284fe969474.png!

