[ https://issues.apache.org/jira/browse/FLINK-18663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166467#comment-17166467 ]
Chesnay Schepler commented on FLINK-18663: ------------------------------------------ So far the only explanation that I could find for a pipeline returning null is that the channel was already closed. Our current assumption was that this happened because the RestServerEndpoint is shutting down. From the logs you gave us it appears that the shutdown is initiated 2 minutes after the NPE; this doesn't seem to match our assumption. I'm wondering what would happen if the netty connection were to be closed by the client. We know that the request processing is delayed by 10 seconds; if the client aborts the connection in between maybe netty starts cleaning up the pipeline. > Fix Flink On YARN AM not exit > ----------------------------- > > Key: FLINK-18663 > URL: https://issues.apache.org/jira/browse/FLINK-18663 > Project: Flink > Issue Type: Bug > Components: Runtime / REST > Affects Versions: 1.10.0, 1.10.1, 1.11.0 > Reporter: tartarus > Assignee: tartarus > Priority: Critical > Labels: pull-request-available > Attachments: 110.png, 111.png, > C49A7310-F932-451B-A203-6D17F3140C0D.png, > e18e00dd6664485c2ff55284fe969474.png, jobmanager.log.noyarn.tar.gz > > > AbstractHandler throw NPE cause by FlinkHttpObjectAggregator is null > when rest throw exception, it will do this code > {code:java} > private CompletableFuture<Void> handleException(Throwable throwable, > ChannelHandlerContext ctx, HttpRequest httpRequest) { > FlinkHttpObjectAggregator flinkHttpObjectAggregator = > ctx.pipeline().get(FlinkHttpObjectAggregator.class); > int maxLength = flinkHttpObjectAggregator.maxContentLength() - > OTHER_RESP_PAYLOAD_OVERHEAD; > if (throwable instanceof RestHandlerException) { > RestHandlerException rhe = (RestHandlerException) throwable; > String stackTrace = ExceptionUtils.stringifyException(rhe); > String truncatedStackTrace = Ascii.truncate(stackTrace, > maxLength, "..."); > if (log.isDebugEnabled()) { > log.error("Exception occurred in REST handler.", rhe); > } else { > log.error("Exception occurred in REST handler: {}", > rhe.getMessage()); > } > return HandlerUtils.sendErrorResponse( > ctx, > httpRequest, > new ErrorResponseBody(truncatedStackTrace), > rhe.getHttpResponseStatus(), > responseHeaders); > } else { > log.error("Unhandled exception.", throwable); > String stackTrace = String.format("<Exception on server > side:%n%s%nEnd of exception on server side>", > ExceptionUtils.stringifyException(throwable)); > String truncatedStackTrace = Ascii.truncate(stackTrace, > maxLength, "..."); > return HandlerUtils.sendErrorResponse( > ctx, > httpRequest, > new ErrorResponseBody(Arrays.asList("Internal server > error.", truncatedStackTrace)), > HttpResponseStatus.INTERNAL_SERVER_ERROR, > responseHeaders); > } > } > {code} > but flinkHttpObjectAggregator some case is null,so this will throw NPE,but > this method called by AbstractHandler#respondAsLeader > {code:java} > requestProcessingFuture > .whenComplete((Void ignored, Throwable throwable) -> { > if (throwable != null) { > > handleException(ExceptionUtils.stripCompletionException(throwable), ctx, > httpRequest) > .whenComplete((Void ignored2, Throwable > throwable2) -> finalizeRequestProcessing(finalUploadedFiles)); > } else { > finalizeRequestProcessing(finalUploadedFiles); > } > }); > {code} > the result is InFlightRequestTracker Cannot be cleared. > so the CompletableFuture does‘t complete that handler's closeAsync returned > !C49A7310-F932-451B-A203-6D17F3140C0D.png! > !e18e00dd6664485c2ff55284fe969474.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)